soup-message-io: use gio streams rather than SoupSocket
[platform/upstream/libsoup.git] / libsoup / soup-message-io.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-message-io.c: HTTP message I/O
4  *
5  * Copyright (C) 2000-2003, Ximian, Inc.
6  */
7
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11
12 #include <stdlib.h>
13 #include <string.h>
14
15 #include "soup-body-input-stream.h"
16 #include "soup-body-output-stream.h"
17 #include "soup-connection.h"
18 #include "soup-filter-input-stream.h"
19 #include "soup-message.h"
20 #include "soup-message-private.h"
21 #include "soup-message-queue.h"
22 #include "soup-misc.h"
23 #include "soup-socket.h"
24
25 typedef enum {
26         SOUP_MESSAGE_IO_CLIENT,
27         SOUP_MESSAGE_IO_SERVER
28 } SoupMessageIOMode;
29
30 typedef enum {
31         SOUP_MESSAGE_IO_STATE_NOT_STARTED,
32         SOUP_MESSAGE_IO_STATE_HEADERS,
33         SOUP_MESSAGE_IO_STATE_BLOCKING,
34         SOUP_MESSAGE_IO_STATE_BODY_START,
35         SOUP_MESSAGE_IO_STATE_BODY,
36         SOUP_MESSAGE_IO_STATE_BODY_DATA,
37         SOUP_MESSAGE_IO_STATE_BODY_DONE,
38         SOUP_MESSAGE_IO_STATE_FINISHING,
39         SOUP_MESSAGE_IO_STATE_DONE
40 } SoupMessageIOState;
41
42 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
43         (state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
44          state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
45          state != SOUP_MESSAGE_IO_STATE_DONE)
46
47 typedef struct {
48         SoupMessageQueueItem *item;
49         SoupMessageIOMode     mode;
50         GCancellable         *cancellable;
51
52         SoupSocket             *sock;
53         SoupFilterInputStream  *istream;
54         GInputStream           *body_istream;
55         GOutputStream          *ostream;
56         GOutputStream          *body_ostream;
57         GMainContext           *async_context;
58         gboolean                blocking;
59
60         SoupMessageIOState    read_state;
61         SoupEncoding          read_encoding;
62         GByteArray           *read_header_buf;
63         SoupMessageBody      *read_body;
64         goffset               read_length;
65
66         gboolean              need_content_sniffed, need_got_chunk;
67         SoupMessageBody      *sniff_data;
68
69         SoupMessageIOState    write_state;
70         SoupEncoding          write_encoding;
71         GString              *write_buf;
72         SoupMessageBody      *write_body;
73         SoupBuffer           *write_chunk;
74         goffset               write_body_offset;
75         goffset               write_length;
76         goffset               written;
77
78         GSource *io_source;
79         GSource *unpause_source;
80         gboolean paused;
81
82         SoupMessageGetHeadersFn   get_headers_cb;
83         SoupMessageParseHeadersFn parse_headers_cb;
84         gpointer                  header_data;
85         SoupMessageCompletionFn   completion_cb;
86         gpointer                  completion_data;
87 } SoupMessageIOData;
88         
89
90 /* Put these around callback invocation if there is code afterward
91  * that depends on the IO having not been cancelled.
92  */
93 #define dummy_to_make_emacs_happy {
94 #define SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK { gboolean cancelled; g_object_ref (msg);
95 #define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return; }
96 #define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return val; }
97
98 #define RESPONSE_BLOCK_SIZE 8192
99
100 void
101 soup_message_io_cleanup (SoupMessage *msg)
102 {
103         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
104         SoupMessageIOData *io;
105
106         soup_message_io_stop (msg);
107
108         io = priv->io_data;
109         if (!io)
110                 return;
111         priv->io_data = NULL;
112
113         if (io->sock)
114                 g_object_unref (io->sock);
115         if (io->istream)
116                 g_object_remove_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
117         if (io->ostream)
118                 g_object_remove_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
119         if (io->body_istream)
120                 g_object_unref (io->body_istream);
121         if (io->body_ostream)
122                 g_object_unref (io->body_ostream);
123         if (io->async_context)
124                 g_main_context_unref (io->async_context);
125         if (io->item)
126                 soup_message_queue_item_unref (io->item);
127
128         g_byte_array_free (io->read_header_buf, TRUE);
129
130         g_string_free (io->write_buf, TRUE);
131         if (io->write_chunk)
132                 soup_buffer_free (io->write_chunk);
133
134         if (io->sniff_data)
135                 soup_message_body_free (io->sniff_data);
136
137         g_slice_free (SoupMessageIOData, io);
138 }
139
140 void
141 soup_message_io_stop (SoupMessage *msg)
142 {
143         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
144         SoupMessageIOData *io = priv->io_data;
145
146         if (!io)
147                 return;
148
149         if (io->io_source) {
150                 g_source_destroy (io->io_source);
151                 io->io_source = NULL;
152         }
153
154         if (io->unpause_source) {
155                 g_source_destroy (io->unpause_source);
156                 io->unpause_source = NULL;
157         }
158
159         if (io->read_state < SOUP_MESSAGE_IO_STATE_FINISHING)
160                 soup_socket_disconnect (io->sock);
161 }
162
163 void
164 soup_message_io_finished (SoupMessage *msg)
165 {
166         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
167         SoupMessageIOData *io = priv->io_data;
168         SoupMessageCompletionFn completion_cb = io->completion_cb;
169         gpointer completion_data = io->completion_data;
170
171         g_object_ref (msg);
172         soup_message_io_cleanup (msg);
173         if (completion_cb)
174                 completion_cb (msg, completion_data);
175         g_object_unref (msg);
176 }
177
178 static gboolean
179 request_is_idempotent (SoupMessage *msg)
180 {
181         /* FIXME */
182         return (msg->method == SOUP_METHOD_GET);
183 }
184
185 static void
186 io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
187 {
188         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
189         SoupMessageIOData *io = priv->io_data;
190
191         if (error && error->domain == G_TLS_ERROR) {
192                 soup_message_set_status_full (msg,
193                                               SOUP_STATUS_SSL_FAILED,
194                                               error->message);
195         } else if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
196                    io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
197                    io->read_header_buf->len == 0 &&
198                    soup_connection_get_ever_used (io->item->conn) &&
199                    !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
200                    request_is_idempotent (msg)) {
201                 /* Connection got closed, but we can safely try again */
202                 io->item->state = SOUP_MESSAGE_RESTARTING;
203         } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
204                 soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
205
206         if (error)
207                 g_error_free (error);
208
209         soup_message_io_finished (msg);
210 }
211
212 static gboolean
213 io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
214 {
215         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
216         SoupMessageIOData *io = priv->io_data;
217         SoupBuffer *sniffed_buffer;
218         char *sniffed_mime_type;
219         GHashTable *params = NULL;
220
221         if (!priv->sniffer)
222                 return TRUE;
223
224         if (!io->sniff_data) {
225                 io->sniff_data = soup_message_body_new ();
226                 io->need_content_sniffed = TRUE;
227         }
228
229         if (io->need_content_sniffed) {
230                 if (io->sniff_data->length < priv->bytes_for_sniffing &&
231                     !done_reading)
232                         return TRUE;
233
234                 io->need_content_sniffed = FALSE;
235                 sniffed_buffer = soup_message_body_flatten (io->sniff_data);
236                 sniffed_mime_type = soup_content_sniffer_sniff (priv->sniffer, msg, sniffed_buffer, &params);
237
238                 SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
239                 soup_message_content_sniffed (msg, sniffed_mime_type, params);
240                 g_free (sniffed_mime_type);
241                 if (params)
242                         g_hash_table_destroy (params);
243                 if (sniffed_buffer)
244                         soup_buffer_free (sniffed_buffer);
245                 SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
246         }
247
248         if (io->need_got_chunk) {
249                 io->need_got_chunk = FALSE;
250                 sniffed_buffer = soup_message_body_flatten (io->sniff_data);
251
252                 SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
253                 soup_message_got_chunk (msg, sniffed_buffer);
254                 soup_buffer_free (sniffed_buffer);
255                 SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
256         }
257
258         return TRUE;
259 }
260
261 static gboolean
262 read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
263 {
264         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
265         SoupMessageIOData *io = priv->io_data;
266         gssize nread, old_len;
267         gboolean got_lf;
268
269         while (1) {
270                 old_len = io->read_header_buf->len;
271                 g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
272                 nread = soup_filter_input_stream_read_line (io->istream,
273                                                             io->read_header_buf->data + old_len,
274                                                             RESPONSE_BLOCK_SIZE,
275                                                             io->blocking,
276                                                             &got_lf,
277                                                             cancellable, error);
278                 io->read_header_buf->len = old_len + MAX (nread, 0);
279                 if (nread == 0)
280                         io_error (io->sock, msg, NULL);
281                 if (nread <= 0)
282                         return FALSE;
283
284                 if (got_lf) {
285                         if (nread == 1 && old_len >= 2 &&
286                             !strncmp ((char *)io->read_header_buf->data +
287                                       io->read_header_buf->len - 2,
288                                       "\n\n", 2))
289                                 break;
290                         else if (nread == 2 && old_len >= 3 &&
291                                  !strncmp ((char *)io->read_header_buf->data +
292                                            io->read_header_buf->len - 3,
293                                            "\n\r\n", 3))
294                                 break;
295                 }
296         }
297
298         /* We need to "rewind" io->read_header_buf back one line.
299          * That SHOULD be two characters (CR LF), but if the
300          * web server was stupid, it might only be one.
301          */
302         if (io->read_header_buf->len < 3 ||
303             io->read_header_buf->data[io->read_header_buf->len - 2] == '\n')
304                 io->read_header_buf->len--;
305         else
306                 io->read_header_buf->len -= 2;
307         io->read_header_buf->data[io->read_header_buf->len] = '\0';
308
309         return TRUE;
310 }
311
312 static SoupBuffer *
313 content_decode_one (SoupBuffer *buf, GConverter *converter, GError **error)
314 {
315         gsize outbuf_length, outbuf_used, outbuf_cur, input_used, input_cur;
316         char *outbuf;
317         GConverterResult result;
318         gboolean dummy_zlib_header_used = FALSE;
319
320         outbuf_length = MAX (buf->length * 2, 1024);
321         outbuf = g_malloc (outbuf_length);
322         outbuf_cur = input_cur = 0;
323
324         do {
325                 result = g_converter_convert (
326                         converter,
327                         buf->data + input_cur, buf->length - input_cur,
328                         outbuf + outbuf_cur, outbuf_length - outbuf_cur,
329                         0, &input_used, &outbuf_used, error);
330                 input_cur += input_used;
331                 outbuf_cur += outbuf_used;
332
333                 if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_NO_SPACE) ||
334                     (!*error && outbuf_cur == outbuf_length)) {
335                         g_clear_error (error);
336                         outbuf_length *= 2;
337                         outbuf = g_realloc (outbuf, outbuf_length);
338                 } else if (input_cur == 0 &&
339                            !dummy_zlib_header_used &&
340                            G_IS_ZLIB_DECOMPRESSOR (converter) &&
341                            g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_INVALID_DATA)) {
342
343                         GZlibCompressorFormat format;
344                         g_object_get (G_OBJECT (converter), "format", &format, NULL);
345
346                         if (format == G_ZLIB_COMPRESSOR_FORMAT_ZLIB) {
347                                 /* Some servers (especially Apache with mod_deflate)
348                                  * return RAW compressed data without the zlib headers
349                                  * when the client claims to support deflate. For
350                                  * those cases use a dummy header (stolen from
351                                  * Mozilla's nsHTTPCompressConv.cpp) and try to
352                                  * continue uncompressing data.
353                                  */
354                                 static char dummy_zlib_header[2] = { 0x78, 0x9C };
355
356                                 g_converter_reset (converter);
357                                 result = g_converter_convert (converter,
358                                                               dummy_zlib_header, sizeof(dummy_zlib_header),
359                                                               outbuf + outbuf_cur, outbuf_length - outbuf_cur,
360                                                               0, &input_used, &outbuf_used, NULL);
361                                 dummy_zlib_header_used = TRUE;
362                                 if (result == G_CONVERTER_CONVERTED) {
363                                         g_clear_error (error);
364                                         continue;
365                                 }
366                         }
367
368                         g_free (outbuf);
369                         return NULL;
370
371                 } else if (*error) {
372                         /* GZlibDecompressor can't ever return
373                          * G_IO_ERROR_PARTIAL_INPUT unless we pass it
374                          * input_length = 0, which we don't. Other
375                          * converters might of course, so eventually
376                          * this code needs to be rewritten to deal
377                          * with that.
378                          */
379                         g_free (outbuf);
380                         return NULL;
381                 }
382         } while (input_cur < buf->length && result != G_CONVERTER_FINISHED);
383
384         if (outbuf_cur)
385                 return soup_buffer_new (SOUP_MEMORY_TAKE, outbuf, outbuf_cur);
386         else {
387                 g_free (outbuf);
388                 return NULL;
389         }
390 }
391
392 static SoupBuffer *
393 content_decode (SoupMessage *msg, SoupBuffer *buf)
394 {
395         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
396         GConverter *decoder;
397         SoupBuffer *decoded;
398         GError *error = NULL;
399         GSList *d;
400
401         for (d = priv->decoders; d; d = d->next) {
402                 decoder = d->data;
403
404                 decoded = content_decode_one (buf, decoder, &error);
405                 if (error) {
406                         if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_FAILED))
407                                 g_warning ("Content-Decoding error: %s\n", error->message);
408                         g_error_free (error);
409
410                         soup_message_set_flags (msg, priv->msg_flags & ~SOUP_MESSAGE_CONTENT_DECODED);
411                         break;
412                 }
413                 if (buf)
414                         soup_buffer_free (buf);
415
416                 if (decoded)
417                         buf = decoded;
418                 else
419                         return NULL;
420         }
421
422         return buf;
423 }
424
425 /*
426  * There are two request/response formats: the basic request/response,
427  * possibly with one or more unsolicited informational responses (such
428  * as the WebDAV "102 Processing" response):
429  *
430  *     Client                            Server
431  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
432  *      W:BODY     / R:NOT_STARTED    ->  R:BODY     / W:NOT_STARTED
433  *     [W:DONE     / R:HEADERS (1xx)  <-  R:DONE     / W:HEADERS (1xx) ...]
434  *      W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS
435  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
436  *      W:DONE     / R:DONE               R:DONE     / W:DONE
437  *     
438  * and the "Expect: 100-continue" request/response, with the client
439  * blocking halfway through its request, and then either continuing or
440  * aborting, depending on the server response:
441  *
442  *     Client                            Server
443  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
444  *      W:BLOCKING / R:HEADERS        <-  R:BLOCKING / W:HEADERS
445  *     [W:BODY     / R:BLOCKING       ->  R:BODY     / W:BLOCKING]
446  *     [W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS]
447  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
448  *      W:DONE     / R:DONE               R:DONE     / W:DONE
449  */
450
451 /* Attempts to push forward the writing side of @msg's I/O. Returns
452  * %TRUE if it manages to make some progress, and it is likely that
453  * further progress can be made. Returns %FALSE if it has reached a
454  * stopping point of some sort (need input from the application,
455  * socket not writable, write is complete, etc).
456  */
457 static gboolean
458 io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
459 {
460         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
461         SoupMessageIOData *io = priv->io_data;
462         SoupBuffer *chunk;
463         gssize nwrote;
464
465         switch (io->write_state) {
466         case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
467         case SOUP_MESSAGE_IO_STATE_BLOCKING:
468                 return FALSE;
469
470
471         case SOUP_MESSAGE_IO_STATE_HEADERS:
472                 if (!io->write_buf->len) {
473                         io->get_headers_cb (msg, io->write_buf,
474                                             &io->write_encoding,
475                                             io->header_data);
476                 }
477
478                 while (io->written < io->write_buf->len) {
479                         nwrote = g_pollable_stream_write (io->ostream,
480                                                           io->write_buf->str + io->written,
481                                                           io->write_buf->len - io->written,
482                                                           io->blocking,
483                                                           cancellable, error);
484                         if (nwrote == -1)
485                                 return FALSE;
486                         io->written += nwrote;
487                 }
488
489                 io->written = 0;
490                 g_string_truncate (io->write_buf, 0);
491
492                 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
493                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
494                         if (msg->status_code == SOUP_STATUS_CONTINUE) {
495                                 /* Stop and wait for the body now */
496                                 io->write_state =
497                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
498                                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
499                         } else {
500                                 /* We just wrote a 1xx response
501                                  * header, so stay in STATE_HEADERS.
502                                  * (The caller will pause us from the
503                                  * wrote_informational callback if he
504                                  * is not ready to send the final
505                                  * response.)
506                                  */
507                         }
508
509                         soup_message_wrote_informational (msg);
510                         soup_message_cleanup_response (msg);
511                         break;
512                 }
513
514                 if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
515                         SoupMessageHeaders *hdrs =
516                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
517                                 msg->request_headers : msg->response_headers;
518                         io->write_length = soup_message_headers_get_content_length (hdrs);
519                 }
520
521                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
522                     soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
523                         /* Need to wait for the Continue response */
524                         io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
525                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
526                 } else {
527                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
528
529                         /* If the client was waiting for a Continue
530                          * but we sent something else, then they're
531                          * now done writing.
532                          */
533                         if (io->mode == SOUP_MESSAGE_IO_SERVER &&
534                             io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
535                                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
536                 }
537
538                 soup_message_wrote_headers (msg);
539                 break;
540
541
542         case SOUP_MESSAGE_IO_STATE_BODY_START:
543                 io->body_ostream = soup_body_output_stream_new (io->ostream,
544                                                                 io->write_encoding,
545                                                                 io->write_length);
546                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
547                 break;
548
549
550         case SOUP_MESSAGE_IO_STATE_BODY:
551                 if (!io->write_length &&
552                     io->write_encoding != SOUP_ENCODING_EOF &&
553                     io->write_encoding != SOUP_ENCODING_CHUNKED) {
554                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
555                         break;
556                 }
557
558                 if (!io->write_chunk) {
559                         io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
560                         if (!io->write_chunk) {
561                                 soup_message_io_pause (msg);
562                                 return FALSE;
563                         }
564                         if (!io->write_chunk->length) {
565                                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
566                                 break;
567                         }
568                 }
569
570                 nwrote = g_pollable_stream_write (io->body_ostream,
571                                                   io->write_chunk->data + io->written,
572                                                   io->write_chunk->length - io->written,
573                                                   io->blocking,
574                                                   cancellable, error);
575                 if (nwrote == -1)
576                         return FALSE;
577
578                 chunk = soup_buffer_new_subbuffer (io->write_chunk,
579                                                    io->written, nwrote);
580                 io->written += nwrote;
581                 if (io->write_length)
582                         io->write_length -= nwrote;
583
584                 if (io->written == io->write_chunk->length)
585                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
586
587                 soup_message_wrote_body_data (msg, chunk);
588                 soup_buffer_free (chunk);
589                 break;
590
591
592         case SOUP_MESSAGE_IO_STATE_BODY_DATA:
593                 io->written = 0;
594                 if (io->write_chunk->length == 0) {
595                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
596                         break;
597                 }
598
599                 if (io->mode == SOUP_MESSAGE_IO_SERVER ||
600                     priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
601                         soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
602                 io->write_body_offset += io->write_chunk->length;
603                 soup_buffer_free (io->write_chunk);
604                 io->write_chunk = NULL;
605
606                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
607                 soup_message_wrote_chunk (msg);
608                 break;
609
610
611         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
612                 if (io->body_ostream) {
613                         if (!g_output_stream_close (io->body_ostream, cancellable, error))
614                                 return FALSE;
615                         g_clear_object (&io->body_ostream);
616                 }
617
618                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
619                 soup_message_wrote_body (msg);
620                 break;
621
622
623         case SOUP_MESSAGE_IO_STATE_FINISHING:
624                 io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
625
626                 if (io->mode == SOUP_MESSAGE_IO_CLIENT)
627                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
628                 break;
629
630
631         case SOUP_MESSAGE_IO_STATE_DONE:
632         default:
633                 g_return_val_if_reached (FALSE);
634         }
635
636         return TRUE;
637 }
638
639 /* Attempts to push forward the reading side of @msg's I/O. Returns
640  * %TRUE if it manages to make some progress, and it is likely that
641  * further progress can be made. Returns %FALSE if it has reached a
642  * stopping point of some sort (need input from the application,
643  * socket not readable, read is complete, etc).
644  */
645 static gboolean
646 io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
647 {
648         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
649         SoupMessageIOData *io = priv->io_data;
650         guchar *stack_buf = NULL;
651         gssize nread;
652         SoupBuffer *buffer;
653         guint status;
654
655         switch (io->read_state) {
656         case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
657         case SOUP_MESSAGE_IO_STATE_BLOCKING:
658                 return FALSE;
659
660
661         case SOUP_MESSAGE_IO_STATE_HEADERS:
662                 if (!read_headers (msg, cancellable, error))
663                         return FALSE;
664
665                 status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
666                                                io->read_header_buf->len,
667                                                &io->read_encoding,
668                                                io->header_data);
669                 g_byte_array_set_size (io->read_header_buf, 0);
670
671                 if (status != SOUP_STATUS_OK) {
672                         /* Either we couldn't parse the headers, or they
673                          * indicated something that would mean we wouldn't
674                          * be able to parse the body. (Eg, unknown
675                          * Transfer-Encoding.). Skip the rest of the
676                          * reading, and make sure the connection gets
677                          * closed when we're done.
678                          */
679                         soup_message_set_status (msg, status);
680                         soup_message_headers_append (msg->request_headers,
681                                                      "Connection", "close");
682                         io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
683                         break;
684                 }
685
686                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
687                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
688                         if (msg->status_code == SOUP_STATUS_CONTINUE &&
689                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
690                                 /* Pause the reader, unpause the writer */
691                                 io->read_state =
692                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
693                                 io->write_state =
694                                         SOUP_MESSAGE_IO_STATE_BODY_START;
695                         } else {
696                                 /* Just stay in HEADERS */
697                                 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
698                         }
699
700                         /* Informational responses have no bodies, so
701                          * bail out here rather than parsing encoding, etc
702                          */
703                         soup_message_got_informational (msg);
704                         soup_message_cleanup_response (msg);
705                         break;
706                 } else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
707                            soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
708                         /* The client requested a Continue response. The
709                          * got_headers handler may change this to something
710                          * else though.
711                          */
712                         soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
713                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
714                         io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
715                 } else {
716                         io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
717
718                         /* If the client was waiting for a Continue
719                          * but got something else, then it's done
720                          * writing.
721                          */
722                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
723                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
724                                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
725                 }
726
727                 if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
728                         SoupMessageHeaders *hdrs =
729                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
730                                 msg->response_headers : msg->request_headers;
731                         io->read_length = soup_message_headers_get_content_length (hdrs);
732
733                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
734                             !soup_message_is_keepalive (msg)) {
735                                 /* Some servers suck and send
736                                  * incorrect Content-Length values, so
737                                  * allow EOF termination in this case
738                                  * (iff the message is too short) too.
739                                  */
740                                 io->read_encoding = SOUP_ENCODING_EOF;
741                         }
742                 } else
743                         io->read_length = -1;
744
745                 io->body_istream = soup_body_input_stream_new (SOUP_FILTER_INPUT_STREAM (io->istream),
746                                                                io->read_encoding,
747                                                                io->read_length);
748                 soup_message_got_headers (msg);
749                 break;
750
751
752         case SOUP_MESSAGE_IO_STATE_BODY:
753                 if (!io_handle_sniffing (msg, FALSE))
754                         return FALSE;
755
756                 if (priv->chunk_allocator) {
757                         buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
758                         if (!buffer) {
759                                 soup_message_io_pause (msg);
760                                 return FALSE;
761                         }
762                 } else {
763                         if (!stack_buf)
764                                 stack_buf = alloca (RESPONSE_BLOCK_SIZE);
765                         buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
766                                                   stack_buf,
767                                                   RESPONSE_BLOCK_SIZE);
768                 }
769
770                 nread = g_pollable_stream_read (io->body_istream,
771                                                 (guchar *)buffer->data,
772                                                 buffer->length,
773                                                 io->blocking,
774                                                 cancellable, error);
775                 if (nread > 0) {
776                         buffer->length = nread;
777                         buffer = content_decode (msg, buffer);
778                         if (!buffer)
779                                 break;
780
781                         soup_message_body_got_chunk (io->read_body, buffer);
782
783                         if (io->need_content_sniffed) {
784                                 soup_message_body_append_buffer (io->sniff_data, buffer);
785                                 soup_buffer_free (buffer);
786                                 io->need_got_chunk = TRUE;
787                                 if (!io_handle_sniffing (msg, FALSE))
788                                         return FALSE;
789                                 break;
790                         }
791
792                         soup_message_got_chunk (msg, buffer);
793                         soup_buffer_free (buffer);
794                         break;
795                 }
796
797                 soup_buffer_free (buffer);
798                 if (nread == -1)
799                         return FALSE;
800
801                 /* else nread == 0 */
802                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
803                 break;
804
805
806         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
807                 if (!io_handle_sniffing (msg, TRUE))
808                         return FALSE;
809
810                 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
811                 soup_message_got_body (msg);
812                 break;
813
814
815         case SOUP_MESSAGE_IO_STATE_FINISHING:
816                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
817
818                 if (io->mode == SOUP_MESSAGE_IO_SERVER)
819                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
820                 break;
821
822
823         case SOUP_MESSAGE_IO_STATE_DONE:
824         default:
825                 g_return_val_if_reached (FALSE);
826         }
827
828         return TRUE;
829 }
830
831 static GSource *
832 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
833                             GSourceFunc callback, gpointer user_data)
834 {
835         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
836         SoupMessageIOData *io = priv->io_data;
837         GSource *source;
838
839         if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) {
840                 source = g_pollable_input_stream_create_source (
841                         G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
842         } else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) {
843                 source = g_pollable_output_stream_create_source (
844                         G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
845         } else
846                 g_return_val_if_reached (NULL);
847
848         g_source_set_callback (source, callback, user_data, NULL);
849         return source;
850 }
851
852 static gboolean io_run (GObject *stream, SoupMessage *msg);
853
854 static void
855 setup_io_source (SoupMessage *msg)
856 {
857         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
858         SoupMessageIOData *io = priv->io_data;
859
860         io->io_source = soup_message_io_get_source (msg, NULL,
861                                                     (GSourceFunc)io_run, msg);
862         g_source_attach (io->io_source, io->async_context);
863         g_source_unref (io->io_source);
864 }
865
866 static gboolean
867 io_run (GObject *stream, SoupMessage *msg)
868 {
869         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
870         SoupMessageIOData *io = priv->io_data;
871         GError *error = NULL;
872
873         if (io->io_source) {
874                 g_source_destroy (io->io_source);
875                 io->io_source = NULL;
876         }
877
878         g_object_ref (msg);
879
880         while (priv->io_data == io && !io->paused) {
881                 gboolean progress = FALSE;
882
883                 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
884                         progress = io_read (msg, io->cancellable, &error);
885                 else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
886                         progress = io_write (msg, io->cancellable, &error);
887
888                 if (!progress)
889                         break;
890         }
891
892         if (error) {
893                 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
894                         g_clear_error (&error);
895                         setup_io_source (msg);
896                 } else
897                         io_error (io->sock, msg, error);
898         } else if (priv->io_data == io &&
899                    io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
900                    io->write_state == SOUP_MESSAGE_IO_STATE_DONE)
901                 soup_message_io_finished (msg);
902
903         g_object_unref (msg);
904         return FALSE;
905 }
906
907
908 static SoupMessageIOData *
909 new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
910              SoupMessageGetHeadersFn get_headers_cb,
911              SoupMessageParseHeadersFn parse_headers_cb,
912              gpointer header_data,
913              SoupMessageCompletionFn completion_cb,
914              gpointer completion_data)
915 {
916         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
917         SoupMessageIOData *io;
918         gboolean non_blocking, use_thread_context;
919
920         io = g_slice_new0 (SoupMessageIOData);
921         io->mode = mode;
922         io->get_headers_cb   = get_headers_cb;
923         io->parse_headers_cb = parse_headers_cb;
924         io->header_data      = header_data;
925         io->completion_cb    = completion_cb;
926         io->completion_data  = completion_data;
927
928         io->sock = g_object_ref (sock);
929         io->istream = SOUP_FILTER_INPUT_STREAM (soup_socket_get_input_stream (sock));
930         if (io->istream)
931                 g_object_add_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
932         io->ostream = soup_socket_get_output_stream (sock);
933         if (io->ostream)
934                 g_object_add_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
935
936         g_object_get (io->sock,
937                       SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking,
938                       SOUP_SOCKET_USE_THREAD_CONTEXT, &use_thread_context,
939                       NULL);
940         io->blocking = !non_blocking;
941
942         if (use_thread_context) {
943                 io->async_context = g_main_context_get_thread_default ();
944                 if (io->async_context)
945                         g_main_context_ref (io->async_context);
946         } else {
947                 g_object_get (io->sock,
948                               SOUP_SOCKET_ASYNC_CONTEXT, &io->async_context,
949                               NULL);
950         }
951
952         io->read_header_buf = g_byte_array_new ();
953         io->write_buf       = g_string_new (NULL);
954
955         io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
956         io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
957
958         if (priv->io_data)
959                 soup_message_io_cleanup (msg);
960         priv->io_data = io;
961         return io;
962 }
963
964 void
965 soup_message_io_client (SoupMessageQueueItem *item,
966                         SoupMessageGetHeadersFn get_headers_cb,
967                         SoupMessageParseHeadersFn parse_headers_cb,
968                         gpointer header_data,
969                         SoupMessageCompletionFn completion_cb,
970                         gpointer completion_data)
971 {
972         SoupMessageIOData *io;
973         SoupSocket *sock = soup_connection_get_socket (item->conn);
974
975         io = new_iostate (item->msg, sock, SOUP_MESSAGE_IO_CLIENT,
976                           get_headers_cb, parse_headers_cb, header_data,
977                           completion_cb, completion_data);
978
979         io->item = item;
980         soup_message_queue_item_ref (item);
981         io->cancellable = item->cancellable;
982
983         io->read_body       = item->msg->response_body;
984         io->write_body      = item->msg->request_body;
985
986         io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
987         io_run (NULL, item->msg);
988 }
989
990 void
991 soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
992                         SoupMessageGetHeadersFn get_headers_cb,
993                         SoupMessageParseHeadersFn parse_headers_cb,
994                         gpointer header_data,
995                         SoupMessageCompletionFn completion_cb,
996                         gpointer completion_data)
997 {
998         SoupMessageIOData *io;
999
1000         io = new_iostate (msg, sock, SOUP_MESSAGE_IO_SERVER,
1001                           get_headers_cb, parse_headers_cb, header_data,
1002                           completion_cb, completion_data);
1003
1004         io->read_body       = msg->request_body;
1005         io->write_body      = msg->response_body;
1006
1007         io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
1008         io_run (NULL, msg);
1009 }
1010
1011 void  
1012 soup_message_io_pause (SoupMessage *msg)
1013 {
1014         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1015         SoupMessageIOData *io = priv->io_data;
1016
1017         g_return_if_fail (io != NULL);
1018
1019         if (io->io_source) {
1020                 g_source_destroy (io->io_source);
1021                 io->io_source = NULL;
1022         }
1023
1024         if (io->unpause_source) {
1025                 g_source_destroy (io->unpause_source);
1026                 io->unpause_source = NULL;
1027         }
1028
1029         io->paused = TRUE;
1030 }
1031
1032 static gboolean
1033 io_unpause_internal (gpointer msg)
1034 {
1035         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1036         SoupMessageIOData *io = priv->io_data;
1037
1038         g_return_val_if_fail (io != NULL, FALSE);
1039         io->unpause_source = NULL;
1040         io->paused = FALSE;
1041
1042         if (io->io_source)
1043                 return FALSE;
1044
1045         io_run (NULL, msg);
1046         return FALSE;
1047 }
1048
1049 void
1050 soup_message_io_unpause (SoupMessage *msg)
1051 {
1052         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1053         SoupMessageIOData *io = priv->io_data;
1054
1055         g_return_if_fail (io != NULL);
1056
1057         if (!io->blocking) {
1058                 if (!io->unpause_source) {
1059                         io->unpause_source = soup_add_completion (
1060                                 io->async_context, io_unpause_internal, msg);
1061                 }
1062         } else
1063                 io_unpause_internal (msg);
1064 }
1065
1066 /**
1067  * soup_message_io_in_progress:
1068  * @msg: a #SoupMessage
1069  *
1070  * Tests whether or not I/O is currently in progress on @msg.
1071  *
1072  * Return value: whether or not I/O is currently in progress.
1073  **/
1074 gboolean
1075 soup_message_io_in_progress (SoupMessage *msg)
1076 {
1077         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1078
1079         return priv->io_data != NULL;
1080 }