Reapplying patch to disable attempts to use gtk-doc
[profile/ivi/libsoup2.4.git] / libsoup / soup-message-io.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-message-io.c: HTTP message I/O
4  *
5  * Copyright (C) 2000-2003, Ximian, Inc.
6  */
7
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11
12 #include <stdlib.h>
13 #include <string.h>
14
15 #include <glib/gi18n-lib.h>
16
17 #include "soup-body-input-stream.h"
18 #include "soup-body-output-stream.h"
19 #include "soup-client-input-stream.h"
20 #include "soup-connection.h"
21 #include "soup-content-sniffer-stream.h"
22 #include "soup-converter-wrapper.h"
23 #include "soup-filter-input-stream.h"
24 #include "soup-message.h"
25 #include "soup-message-private.h"
26 #include "soup-message-queue.h"
27 #include "soup-misc.h"
28 #include "soup-misc-private.h"
29
30 typedef enum {
31         SOUP_MESSAGE_IO_CLIENT,
32         SOUP_MESSAGE_IO_SERVER
33 } SoupMessageIOMode;
34
35 typedef enum {
36         SOUP_MESSAGE_IO_STATE_NOT_STARTED,
37         SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
38         SOUP_MESSAGE_IO_STATE_HEADERS,
39         SOUP_MESSAGE_IO_STATE_BLOCKING,
40         SOUP_MESSAGE_IO_STATE_BODY_START,
41         SOUP_MESSAGE_IO_STATE_BODY,
42         SOUP_MESSAGE_IO_STATE_BODY_DATA,
43         SOUP_MESSAGE_IO_STATE_BODY_DONE,
44         SOUP_MESSAGE_IO_STATE_FINISHING,
45         SOUP_MESSAGE_IO_STATE_DONE
46 } SoupMessageIOState;
47
48 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
49         (state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
50          state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
51          state != SOUP_MESSAGE_IO_STATE_DONE)
52 #define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
53         (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
54          state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
55
56 typedef struct {
57         SoupMessageQueueItem *item;
58         SoupMessageIOMode     mode;
59         GCancellable         *cancellable;
60
61         GIOStream              *iostream;
62         SoupFilterInputStream  *istream;
63         GInputStream           *body_istream;
64         GOutputStream          *ostream;
65         GOutputStream          *body_ostream;
66         GMainContext           *async_context;
67         gboolean                blocking;
68
69         SoupMessageIOState    read_state;
70         SoupEncoding          read_encoding;
71         GByteArray           *read_header_buf;
72         SoupMessageBody      *read_body;
73         goffset               read_length;
74
75         SoupMessageIOState    write_state;
76         SoupEncoding          write_encoding;
77         GString              *write_buf;
78         SoupMessageBody      *write_body;
79         SoupBuffer           *write_chunk;
80         goffset               write_body_offset;
81         goffset               write_length;
82         goffset               written;
83
84         GSource *io_source;
85         GSource *unpause_source;
86         gboolean paused;
87
88         SoupMessageGetHeadersFn   get_headers_cb;
89         SoupMessageParseHeadersFn parse_headers_cb;
90         gpointer                  header_data;
91         SoupMessageCompletionFn   completion_cb;
92         gpointer                  completion_data;
93 } SoupMessageIOData;
94         
95
96 #define RESPONSE_BLOCK_SIZE 8192
97
98 void
99 soup_message_io_cleanup (SoupMessage *msg)
100 {
101         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
102         SoupMessageIOData *io;
103
104         soup_message_io_stop (msg);
105
106         io = priv->io_data;
107         if (!io)
108                 return;
109         priv->io_data = NULL;
110
111         if (io->iostream)
112                 g_object_unref (io->iostream);
113         if (io->body_istream)
114                 g_object_unref (io->body_istream);
115         if (io->body_ostream)
116                 g_object_unref (io->body_ostream);
117         if (io->async_context)
118                 g_main_context_unref (io->async_context);
119         if (io->item)
120                 soup_message_queue_item_unref (io->item);
121
122         g_byte_array_free (io->read_header_buf, TRUE);
123
124         g_string_free (io->write_buf, TRUE);
125         if (io->write_chunk)
126                 soup_buffer_free (io->write_chunk);
127
128         g_slice_free (SoupMessageIOData, io);
129 }
130
131 void
132 soup_message_io_stop (SoupMessage *msg)
133 {
134         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
135         SoupMessageIOData *io = priv->io_data;
136
137         if (!io)
138                 return;
139
140         if (io->io_source) {
141                 g_source_destroy (io->io_source);
142                 g_source_unref (io->io_source);
143                 io->io_source = NULL;
144         }
145
146         if (io->unpause_source) {
147                 g_source_destroy (io->unpause_source);
148                 g_source_unref (io->unpause_source);
149                 io->unpause_source = NULL;
150         }
151
152         if (io->read_state < SOUP_MESSAGE_IO_STATE_FINISHING) {
153                 if (io->item && io->item->conn)
154                         soup_connection_disconnect (io->item->conn);
155                 else
156                         g_io_stream_close (io->iostream, NULL, NULL);
157         }
158 }
159
160 void
161 soup_message_io_finished (SoupMessage *msg)
162 {
163         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
164         SoupMessageIOData *io = priv->io_data;
165         SoupMessageCompletionFn completion_cb = io->completion_cb;
166         gpointer completion_data = io->completion_data;
167
168         g_object_ref (msg);
169         soup_message_io_cleanup (msg);
170         if (completion_cb)
171                 completion_cb (msg, completion_data);
172         g_object_unref (msg);
173 }
174
175 static gboolean
176 request_is_idempotent (SoupMessage *msg)
177 {
178         /* FIXME */
179         return (msg->method == SOUP_METHOD_GET);
180 }
181
182 static void
183 io_error (SoupMessage *msg, GError *error)
184 {
185         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
186         SoupMessageIOData *io = priv->io_data;
187
188         if (error && error->domain == G_TLS_ERROR) {
189                 soup_message_set_status_full (msg,
190                                               SOUP_STATUS_SSL_FAILED,
191                                               error->message);
192         } else if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
193                    io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
194                    io->read_header_buf->len == 0 &&
195                    soup_connection_get_ever_used (io->item->conn) &&
196                    !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
197                    request_is_idempotent (msg)) {
198                 /* Connection got closed, but we can safely try again */
199                 io->item->state = SOUP_MESSAGE_RESTARTING;
200         } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
201                 soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
202
203         if (error)
204                 g_error_free (error);
205
206         soup_message_io_finished (msg);
207 }
208
209 static gboolean
210 read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
211 {
212         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
213         SoupMessageIOData *io = priv->io_data;
214         gssize nread, old_len;
215         gboolean got_lf;
216
217         while (1) {
218                 old_len = io->read_header_buf->len;
219                 g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
220                 nread = soup_filter_input_stream_read_line (io->istream,
221                                                             io->read_header_buf->data + old_len,
222                                                             RESPONSE_BLOCK_SIZE,
223                                                             io->blocking,
224                                                             &got_lf,
225                                                             cancellable, error);
226                 io->read_header_buf->len = old_len + MAX (nread, 0);
227                 if (nread == 0) {
228                         soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
229                         g_set_error_literal (error, G_IO_ERROR,
230                                              G_IO_ERROR_PARTIAL_INPUT,
231                                              _("Connection terminated unexpectedly"));
232                 }
233                 if (nread <= 0)
234                         return FALSE;
235
236                 if (got_lf) {
237                         if (nread == 1 && old_len >= 2 &&
238                             !strncmp ((char *)io->read_header_buf->data +
239                                       io->read_header_buf->len - 2,
240                                       "\n\n", 2))
241                                 break;
242                         else if (nread == 2 && old_len >= 3 &&
243                                  !strncmp ((char *)io->read_header_buf->data +
244                                            io->read_header_buf->len - 3,
245                                            "\n\r\n", 3))
246                                 break;
247                 }
248         }
249
250         /* We need to "rewind" io->read_header_buf back one line.
251          * That SHOULD be two characters (CR LF), but if the
252          * web server was stupid, it might only be one.
253          */
254         if (io->read_header_buf->len < 3 ||
255             io->read_header_buf->data[io->read_header_buf->len - 2] == '\n')
256                 io->read_header_buf->len--;
257         else
258                 io->read_header_buf->len -= 2;
259         io->read_header_buf->data[io->read_header_buf->len] = '\0';
260
261         return TRUE;
262 }
263
264 static void
265 setup_body_istream (SoupMessage *msg)
266 {
267         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
268         SoupMessageIOData *io = priv->io_data;
269         GConverter *decoder, *wrapper;
270         GInputStream *filter;
271         GSList *d;
272
273         io->body_istream =
274                 soup_body_input_stream_new (io->istream,
275                                             io->read_encoding,
276                                             io->read_length);
277
278         for (d = priv->decoders; d; d = d->next) {
279                 decoder = d->data;
280                 wrapper = soup_converter_wrapper_new (decoder, msg);
281                 filter = g_object_new (G_TYPE_CONVERTER_INPUT_STREAM,
282                                        "base-stream", io->body_istream,
283                                        "converter", wrapper,
284                                        NULL);
285                 g_object_unref (io->body_istream);
286                 io->body_istream = filter;
287         }
288
289         if (priv->sniffer) {
290                 filter = soup_content_sniffer_stream_new (priv->sniffer,
291                                                           msg, io->body_istream);
292                 g_object_unref (io->body_istream);
293                 io->body_istream = filter;
294         }
295 }
296
297 /*
298  * There are two request/response formats: the basic request/response,
299  * possibly with one or more unsolicited informational responses (such
300  * as the WebDAV "102 Processing" response):
301  *
302  *     Client                            Server
303  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
304  *      W:BODY     / R:NOT_STARTED    ->  R:BODY     / W:NOT_STARTED
305  *     [W:DONE     / R:HEADERS (1xx)  <-  R:DONE     / W:HEADERS (1xx) ...]
306  *      W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS
307  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
308  *      W:DONE     / R:DONE               R:DONE     / W:DONE
309  *     
310  * and the "Expect: 100-continue" request/response, with the client
311  * blocking halfway through its request, and then either continuing or
312  * aborting, depending on the server response:
313  *
314  *     Client                            Server
315  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
316  *      W:BLOCKING / R:HEADERS        <-  R:BLOCKING / W:HEADERS
317  *     [W:BODY     / R:BLOCKING       ->  R:BODY     / W:BLOCKING]
318  *     [W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS]
319  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
320  *      W:DONE     / R:DONE               R:DONE     / W:DONE
321  */
322
323 /* Attempts to push forward the writing side of @msg's I/O. Returns
324  * %TRUE if it manages to make some progress, and it is likely that
325  * further progress can be made. Returns %FALSE if it has reached a
326  * stopping point of some sort (need input from the application,
327  * socket not writable, write is complete, etc).
328  */
329 static gboolean
330 io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
331 {
332         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
333         SoupMessageIOData *io = priv->io_data;
334         SoupBuffer *chunk;
335         gssize nwrote;
336
337         switch (io->write_state) {
338         case SOUP_MESSAGE_IO_STATE_HEADERS:
339                 if (!io->write_buf->len) {
340                         io->get_headers_cb (msg, io->write_buf,
341                                             &io->write_encoding,
342                                             io->header_data);
343                 }
344
345                 while (io->written < io->write_buf->len) {
346                         nwrote = g_pollable_stream_write (io->ostream,
347                                                           io->write_buf->str + io->written,
348                                                           io->write_buf->len - io->written,
349                                                           io->blocking,
350                                                           cancellable, error);
351                         if (nwrote == -1)
352                                 return FALSE;
353                         io->written += nwrote;
354                 }
355
356                 io->written = 0;
357                 g_string_truncate (io->write_buf, 0);
358
359                 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
360                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
361                         if (msg->status_code == SOUP_STATUS_CONTINUE) {
362                                 /* Stop and wait for the body now */
363                                 io->write_state =
364                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
365                                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
366                         } else {
367                                 /* We just wrote a 1xx response
368                                  * header, so stay in STATE_HEADERS.
369                                  * (The caller will pause us from the
370                                  * wrote_informational callback if he
371                                  * is not ready to send the final
372                                  * response.)
373                                  */
374                         }
375
376                         soup_message_wrote_informational (msg);
377                         soup_message_cleanup_response (msg);
378                         break;
379                 }
380
381                 if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
382                         SoupMessageHeaders *hdrs =
383                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
384                                 msg->request_headers : msg->response_headers;
385                         io->write_length = soup_message_headers_get_content_length (hdrs);
386                 }
387
388                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
389                     soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
390                         /* Need to wait for the Continue response */
391                         io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
392                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
393                 } else {
394                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
395
396                         /* If the client was waiting for a Continue
397                          * but we sent something else, then they're
398                          * now done writing.
399                          */
400                         if (io->mode == SOUP_MESSAGE_IO_SERVER &&
401                             io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
402                                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
403                 }
404
405                 soup_message_wrote_headers (msg);
406                 break;
407
408
409         case SOUP_MESSAGE_IO_STATE_BODY_START:
410                 io->body_ostream = soup_body_output_stream_new (io->ostream,
411                                                                 io->write_encoding,
412                                                                 io->write_length);
413                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
414                 break;
415
416
417         case SOUP_MESSAGE_IO_STATE_BODY:
418                 if (!io->write_length &&
419                     io->write_encoding != SOUP_ENCODING_EOF &&
420                     io->write_encoding != SOUP_ENCODING_CHUNKED) {
421                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
422                         break;
423                 }
424
425                 if (!io->write_chunk) {
426                         io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
427                         if (!io->write_chunk) {
428                                 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
429                                 soup_message_io_pause (msg);
430                                 return FALSE;
431                         }
432                         if (!io->write_chunk->length) {
433                                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
434                                 break;
435                         }
436                 }
437
438                 nwrote = g_pollable_stream_write (io->body_ostream,
439                                                   io->write_chunk->data + io->written,
440                                                   io->write_chunk->length - io->written,
441                                                   io->blocking,
442                                                   cancellable, error);
443                 if (nwrote == -1)
444                         return FALSE;
445
446                 chunk = soup_buffer_new_subbuffer (io->write_chunk,
447                                                    io->written, nwrote);
448                 io->written += nwrote;
449                 if (io->write_length)
450                         io->write_length -= nwrote;
451
452                 if (io->written == io->write_chunk->length)
453                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
454
455                 soup_message_wrote_body_data (msg, chunk);
456                 soup_buffer_free (chunk);
457                 break;
458
459
460         case SOUP_MESSAGE_IO_STATE_BODY_DATA:
461                 io->written = 0;
462                 if (io->write_chunk->length == 0) {
463                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
464                         break;
465                 }
466
467                 if (io->mode == SOUP_MESSAGE_IO_SERVER ||
468                     priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
469                         soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
470                 io->write_body_offset += io->write_chunk->length;
471                 soup_buffer_free (io->write_chunk);
472                 io->write_chunk = NULL;
473
474                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
475                 soup_message_wrote_chunk (msg);
476                 break;
477
478
479         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
480                 if (io->body_ostream) {
481                         if (!g_output_stream_close (io->body_ostream, cancellable, error))
482                                 return FALSE;
483                         g_clear_object (&io->body_ostream);
484                 }
485
486                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
487                 soup_message_wrote_body (msg);
488                 break;
489
490
491         case SOUP_MESSAGE_IO_STATE_FINISHING:
492                 io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
493
494                 if (io->mode == SOUP_MESSAGE_IO_CLIENT)
495                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
496                 break;
497
498
499         default:
500                 g_return_val_if_reached (FALSE);
501         }
502
503         return TRUE;
504 }
505
506 /* Attempts to push forward the reading side of @msg's I/O. Returns
507  * %TRUE if it manages to make some progress, and it is likely that
508  * further progress can be made. Returns %FALSE if it has reached a
509  * stopping point of some sort (need input from the application,
510  * socket not readable, read is complete, etc).
511  */
512 static gboolean
513 io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
514 {
515         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
516         SoupMessageIOData *io = priv->io_data;
517         guchar *stack_buf = NULL;
518         gssize nread;
519         SoupBuffer *buffer;
520         guint status;
521
522         switch (io->read_state) {
523         case SOUP_MESSAGE_IO_STATE_HEADERS:
524                 if (!read_headers (msg, cancellable, error))
525                         return FALSE;
526
527                 status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
528                                                io->read_header_buf->len,
529                                                &io->read_encoding,
530                                                io->header_data);
531                 g_byte_array_set_size (io->read_header_buf, 0);
532
533                 if (status != SOUP_STATUS_OK) {
534                         /* Either we couldn't parse the headers, or they
535                          * indicated something that would mean we wouldn't
536                          * be able to parse the body. (Eg, unknown
537                          * Transfer-Encoding.). Skip the rest of the
538                          * reading, and make sure the connection gets
539                          * closed when we're done.
540                          */
541                         soup_message_set_status (msg, status);
542                         soup_message_headers_append (msg->request_headers,
543                                                      "Connection", "close");
544                         io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
545                         break;
546                 }
547
548                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
549                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
550                         if (msg->status_code == SOUP_STATUS_CONTINUE &&
551                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
552                                 /* Pause the reader, unpause the writer */
553                                 io->read_state =
554                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
555                                 io->write_state =
556                                         SOUP_MESSAGE_IO_STATE_BODY_START;
557                         } else {
558                                 /* Just stay in HEADERS */
559                                 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
560                         }
561
562                         /* Informational responses have no bodies, so
563                          * bail out here rather than parsing encoding, etc
564                          */
565                         soup_message_got_informational (msg);
566                         soup_message_cleanup_response (msg);
567                         break;
568                 } else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
569                            soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
570                         /* The client requested a Continue response. The
571                          * got_headers handler may change this to something
572                          * else though.
573                          */
574                         soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
575                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
576                         io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
577                 } else {
578                         io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
579
580                         /* If the client was waiting for a Continue
581                          * but got something else, then it's done
582                          * writing.
583                          */
584                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
585                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
586                                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
587                 }
588
589                 if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
590                         SoupMessageHeaders *hdrs =
591                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
592                                 msg->response_headers : msg->request_headers;
593                         io->read_length = soup_message_headers_get_content_length (hdrs);
594
595                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
596                             !soup_message_is_keepalive (msg)) {
597                                 /* Some servers suck and send
598                                  * incorrect Content-Length values, so
599                                  * allow EOF termination in this case
600                                  * (iff the message is too short) too.
601                                  */
602                                 io->read_encoding = SOUP_ENCODING_EOF;
603                         }
604                 } else
605                         io->read_length = -1;
606
607                 soup_message_got_headers (msg);
608                 break;
609
610
611         case SOUP_MESSAGE_IO_STATE_BODY_START:
612                 if (!io->body_istream)
613                         setup_body_istream (msg);
614
615                 if (priv->sniffer) {
616                         SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
617                         const char *content_type;
618                         GHashTable *params;
619
620                         if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable, error))
621                                 return FALSE;
622
623                         content_type = soup_content_sniffer_stream_sniff (sniffer_stream, &params);
624                         soup_message_content_sniffed (msg, content_type, params);
625                 }
626
627                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
628                 break;
629
630
631         case SOUP_MESSAGE_IO_STATE_BODY:
632                 if (priv->chunk_allocator) {
633                         buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
634                         if (!buffer) {
635                                 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
636                                 soup_message_io_pause (msg);
637                                 return FALSE;
638                         }
639                 } else {
640                         if (!stack_buf)
641                                 stack_buf = alloca (RESPONSE_BLOCK_SIZE);
642                         buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
643                                                   stack_buf,
644                                                   RESPONSE_BLOCK_SIZE);
645                 }
646
647                 nread = g_pollable_stream_read (io->body_istream,
648                                                 (guchar *)buffer->data,
649                                                 buffer->length,
650                                                 io->blocking,
651                                                 cancellable, error);
652                 if (nread > 0) {
653                         buffer->length = nread;
654                         soup_message_body_got_chunk (io->read_body, buffer);
655                         soup_message_got_chunk (msg, buffer);
656                         soup_buffer_free (buffer);
657                         break;
658                 }
659
660                 soup_buffer_free (buffer);
661                 if (nread == -1)
662                         return FALSE;
663
664                 /* else nread == 0 */
665                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
666                 break;
667
668
669         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
670                 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
671                 soup_message_got_body (msg);
672                 break;
673
674
675         case SOUP_MESSAGE_IO_STATE_FINISHING:
676                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
677
678                 if (io->mode == SOUP_MESSAGE_IO_SERVER)
679                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
680                 break;
681
682
683         default:
684                 g_return_val_if_reached (FALSE);
685         }
686
687         return TRUE;
688 }
689
690 typedef struct {
691         GSource source;
692         SoupMessage *msg;
693         gboolean paused;
694 } SoupMessageSource;
695
696 static gboolean
697 message_source_check (GSource *source)
698 {
699         SoupMessageSource *message_source = (SoupMessageSource *)source;
700
701         if (message_source->paused) {
702                 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (message_source->msg);
703                 SoupMessageIOData *io = priv->io_data;
704
705                 if (!io || io->paused)
706                         return FALSE;
707                 else
708                         return TRUE;
709         } else
710                 return FALSE;
711 }
712
713 static gboolean
714 message_source_prepare (GSource *source,
715                         gint    *timeout)
716 {
717         *timeout = -1;
718         return message_source_check (source);
719 }
720
721 static gboolean
722 message_source_dispatch (GSource     *source,
723                          GSourceFunc  callback,
724                          gpointer     user_data)
725 {
726         SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
727         SoupMessageSource *message_source = (SoupMessageSource *)source;
728
729         return (*func) (message_source->msg, user_data);
730 }
731
732 static void
733 message_source_finalize (GSource *source)
734 {
735         SoupMessageSource *message_source = (SoupMessageSource *)source;
736
737         g_object_unref (message_source->msg);
738 }
739
740 static gboolean
741 message_source_closure_callback (SoupMessage *msg,
742                                  gpointer     data)
743 {
744         GClosure *closure = data;
745         GValue param = G_VALUE_INIT;
746         GValue result_value = G_VALUE_INIT;
747         gboolean result;
748
749         g_value_init (&result_value, G_TYPE_BOOLEAN);
750
751         g_value_init (&param, SOUP_TYPE_MESSAGE);
752         g_value_set_object (&param, msg);
753
754         g_closure_invoke (closure, &result_value, 1, &param, NULL);
755
756         result = g_value_get_boolean (&result_value);
757         g_value_unset (&result_value);
758         g_value_unset (&param);
759
760         return result;
761 }
762
763 static GSourceFuncs message_source_funcs =
764 {
765         message_source_prepare,
766         message_source_check,
767         message_source_dispatch,
768         message_source_finalize,
769         (GSourceFunc)message_source_closure_callback,
770         (GSourceDummyMarshal)g_cclosure_marshal_generic,
771 };
772
773 GSource *
774 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
775                             SoupMessageSourceFunc callback, gpointer user_data)
776 {
777         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
778         SoupMessageIOData *io = priv->io_data;
779         GSource *base_source, *source;
780         SoupMessageSource *message_source;
781
782         if (!io) {
783                 base_source = g_timeout_source_new (0);
784         } else if (io->paused) {
785                 base_source = NULL;
786         } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
787                 GPollableInputStream *istream;
788
789                 if (io->body_istream)
790                         istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
791                 else
792                         istream = G_POLLABLE_INPUT_STREAM (io->istream);
793                 base_source = g_pollable_input_stream_create_source (istream, cancellable);
794         } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
795                 GPollableOutputStream *ostream;
796
797                 if (io->body_ostream)
798                         ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
799                 else
800                         ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
801                 base_source = g_pollable_output_stream_create_source (ostream, cancellable);
802         } else
803                 base_source = g_timeout_source_new (0);
804
805         source = g_source_new (&message_source_funcs,
806                                sizeof (SoupMessageSource));
807         g_source_set_name (source, "SoupMessageSource");
808         message_source = (SoupMessageSource *)source;
809         message_source->msg = g_object_ref (msg);
810         message_source->paused = io && io->paused;
811
812         if (base_source) {
813                 g_source_set_dummy_callback (base_source);
814                 g_source_add_child_source (source, base_source);
815                 g_source_unref (base_source);
816         }
817         g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
818         return source;
819 }
820
821 static gboolean
822 io_run_until (SoupMessage *msg,
823               SoupMessageIOState read_state, SoupMessageIOState write_state,
824               GCancellable *cancellable, GError **error)
825 {
826         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
827         SoupMessageIOData *io = priv->io_data;
828         gboolean progress = TRUE, done;
829         GError *my_error = NULL;
830
831         if (g_cancellable_set_error_if_cancelled (cancellable, error))
832                 return FALSE;
833         else if (!io) {
834                 g_set_error_literal (error, G_IO_ERROR,
835                                      G_IO_ERROR_CANCELLED,
836                                      _("Operation was cancelled"));
837                 return FALSE;
838         }
839
840         g_object_ref (msg);
841
842         while (progress && priv->io_data == io && !io->paused &&
843                (io->read_state < read_state || io->write_state < write_state)) {
844
845                 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
846                         progress = io_read (msg, cancellable, &my_error);
847                 else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
848                         progress = io_write (msg, cancellable, &my_error);
849                 else
850                         progress = FALSE;
851         }
852
853         if (my_error) {
854                 g_propagate_error (error, my_error);
855                 g_object_unref (msg);
856                 return FALSE;
857         } else if (g_cancellable_set_error_if_cancelled (cancellable, error)) {
858                 g_object_unref (msg);
859                 return FALSE;
860         } else if (priv->io_data != io) {
861                 g_set_error_literal (error, G_IO_ERROR,
862                                      G_IO_ERROR_CANCELLED,
863                                      _("Operation was cancelled"));
864                 g_object_unref (msg);
865                 return FALSE;
866         }
867
868         done = (io->read_state >= read_state &&
869                 io->write_state >= write_state);
870
871         if (io->paused && !done) {
872                 g_set_error_literal (error, G_IO_ERROR,
873                                      G_IO_ERROR_WOULD_BLOCK,
874                                      _("Operation would block"));
875                 g_object_unref (msg);
876                 return FALSE;
877         }
878
879         g_object_unref (msg);
880         return done;
881 }
882
883 static gboolean
884 io_run (SoupMessage *msg, gpointer user_data)
885 {
886         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
887         SoupMessageIOData *io = priv->io_data;
888         GError *error = NULL;
889         GCancellable *cancellable;
890
891         if (io->io_source) {
892                 g_source_destroy (io->io_source);
893                 g_source_unref (io->io_source);
894                 io->io_source = NULL;
895         }
896
897         g_object_ref (msg);
898         cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
899
900         if (io_run_until (msg,
901                           SOUP_MESSAGE_IO_STATE_DONE,
902                           SOUP_MESSAGE_IO_STATE_DONE,
903                           cancellable, &error)) {
904                 soup_message_io_finished (msg);
905         } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
906                 g_clear_error (&error);
907                 io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
908                 g_source_attach (io->io_source, io->async_context);
909         } else if (error && priv->io_data == io) {
910                 io_error (msg, error);
911         }
912
913         g_object_unref (msg);
914         g_clear_object (&cancellable);
915
916         return FALSE;
917 }
918
919 gboolean
920 soup_message_io_run_until_write (SoupMessage *msg,
921                                  GCancellable *cancellable, GError **error)
922 {
923         return io_run_until (msg,
924                              SOUP_MESSAGE_IO_STATE_ANY,
925                              SOUP_MESSAGE_IO_STATE_BODY,
926                              cancellable, error);
927 }
928
929 gboolean
930 soup_message_io_run_until_read (SoupMessage *msg,
931                                 GCancellable *cancellable, GError **error)
932 {
933         return io_run_until (msg,
934                              SOUP_MESSAGE_IO_STATE_BODY,
935                              SOUP_MESSAGE_IO_STATE_ANY,
936                              cancellable, error);
937 }
938
939 gboolean
940 soup_message_io_run_until_finish (SoupMessage   *msg,
941                                   GCancellable  *cancellable,
942                                   GError       **error)
943 {
944         g_object_ref (msg);
945
946         if (!io_run_until (msg,
947                            SOUP_MESSAGE_IO_STATE_DONE,
948                            SOUP_MESSAGE_IO_STATE_DONE,
949                            cancellable, error))
950                 return FALSE;
951
952         soup_message_io_finished (msg);
953         g_object_unref (msg);
954         return TRUE;
955 }
956
957 static void
958 client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
959 {
960         SoupMessage *msg = user_data;
961         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
962         SoupMessageIOData *io = priv->io_data;
963
964         if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
965                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
966 }
967
968 GInputStream *
969 soup_message_io_get_response_istream (SoupMessage  *msg,
970                                       GError      **error)
971 {
972         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
973         SoupMessageIOData *io = priv->io_data;
974         GInputStream *client_stream;
975
976         g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
977
978         if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
979                 g_set_error_literal (error, SOUP_HTTP_ERROR,
980                                      msg->status_code, msg->reason_phrase);
981                 return NULL;
982         }
983
984         client_stream = soup_client_input_stream_new (io->body_istream, msg);
985         g_signal_connect (client_stream, "eof",
986                           G_CALLBACK (client_stream_eof), msg);
987
988         return client_stream;
989 }
990
991
992 static SoupMessageIOData *
993 new_iostate (SoupMessage *msg, GIOStream *iostream,
994              GMainContext *async_context, SoupMessageIOMode mode,
995              SoupMessageGetHeadersFn get_headers_cb,
996              SoupMessageParseHeadersFn parse_headers_cb,
997              gpointer header_data,
998              SoupMessageCompletionFn completion_cb,
999              gpointer completion_data)
1000 {
1001         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1002         SoupMessageIOData *io;
1003
1004         io = g_slice_new0 (SoupMessageIOData);
1005         io->mode = mode;
1006         io->get_headers_cb   = get_headers_cb;
1007         io->parse_headers_cb = parse_headers_cb;
1008         io->header_data      = header_data;
1009         io->completion_cb    = completion_cb;
1010         io->completion_data  = completion_data;
1011
1012         io->iostream = g_object_ref (iostream);
1013         io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
1014         io->ostream = g_io_stream_get_output_stream (iostream);
1015
1016         if (async_context) {
1017                 io->async_context = g_main_context_ref (async_context);
1018                 io->blocking = FALSE;
1019         } else
1020                 io->blocking = TRUE;
1021
1022         io->read_header_buf = g_byte_array_new ();
1023         io->write_buf       = g_string_new (NULL);
1024
1025         io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1026         io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1027
1028         if (priv->io_data)
1029                 soup_message_io_cleanup (msg);
1030         priv->io_data = io;
1031         return io;
1032 }
1033
1034 void
1035 soup_message_io_client (SoupMessageQueueItem *item,
1036                         GIOStream *iostream,
1037                         GMainContext *async_context,
1038                         SoupMessageGetHeadersFn get_headers_cb,
1039                         SoupMessageParseHeadersFn parse_headers_cb,
1040                         gpointer header_data,
1041                         SoupMessageCompletionFn completion_cb,
1042                         gpointer completion_data)
1043 {
1044         SoupMessageIOData *io;
1045
1046         io = new_iostate (item->msg, iostream, async_context,
1047                           SOUP_MESSAGE_IO_CLIENT,
1048                           get_headers_cb, parse_headers_cb, header_data,
1049                           completion_cb, completion_data);
1050
1051         io->item = item;
1052         soup_message_queue_item_ref (item);
1053         io->cancellable = item->cancellable;
1054
1055         io->read_body       = item->msg->response_body;
1056         io->write_body      = item->msg->request_body;
1057
1058         io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
1059         if (!item->new_api)
1060                 io_run (item->msg, NULL);
1061 }
1062
1063 void
1064 soup_message_io_server (SoupMessage *msg,
1065                         GIOStream *iostream, GMainContext *async_context,
1066                         SoupMessageGetHeadersFn get_headers_cb,
1067                         SoupMessageParseHeadersFn parse_headers_cb,
1068                         gpointer header_data,
1069                         SoupMessageCompletionFn completion_cb,
1070                         gpointer completion_data)
1071 {
1072         SoupMessageIOData *io;
1073
1074         io = new_iostate (msg, iostream, async_context,
1075                           SOUP_MESSAGE_IO_SERVER,
1076                           get_headers_cb, parse_headers_cb, header_data,
1077                           completion_cb, completion_data);
1078
1079         io->read_body       = msg->request_body;
1080         io->write_body      = msg->response_body;
1081
1082         io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
1083         io_run (msg, NULL);
1084 }
1085
1086 void  
1087 soup_message_io_pause (SoupMessage *msg)
1088 {
1089         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1090         SoupMessageIOData *io = priv->io_data;
1091
1092         g_return_if_fail (io != NULL);
1093
1094         if (io->item && io->item->new_api)
1095                 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1096
1097         if (io->io_source) {
1098                 g_source_destroy (io->io_source);
1099                 g_source_unref (io->io_source);
1100                 io->io_source = NULL;
1101         }
1102
1103         if (io->unpause_source) {
1104                 g_source_destroy (io->unpause_source);
1105                 io->unpause_source = NULL;
1106         }
1107
1108         io->paused = TRUE;
1109 }
1110
1111 static gboolean
1112 io_unpause_internal (gpointer msg)
1113 {
1114         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1115         SoupMessageIOData *io = priv->io_data;
1116
1117         g_return_val_if_fail (io != NULL, FALSE);
1118         io->unpause_source = NULL;
1119         io->paused = FALSE;
1120
1121         if (io->io_source)
1122                 return FALSE;
1123
1124         io_run (msg, NULL);
1125         return FALSE;
1126 }
1127
1128 void
1129 soup_message_io_unpause (SoupMessage *msg)
1130 {
1131         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1132         SoupMessageIOData *io = priv->io_data;
1133
1134         g_return_if_fail (io != NULL);
1135
1136         if (io->item && io->item->new_api) {
1137                 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1138                 io->paused = FALSE;
1139                 return;
1140         }
1141
1142         if (!io->blocking) {
1143                 if (!io->unpause_source) {
1144                         io->unpause_source = soup_add_completion (
1145                                 io->async_context, io_unpause_internal, msg);
1146                 }
1147         } else
1148                 io_unpause_internal (msg);
1149 }
1150
1151 /**
1152  * soup_message_io_in_progress:
1153  * @msg: a #SoupMessage
1154  *
1155  * Tests whether or not I/O is currently in progress on @msg.
1156  *
1157  * Return value: whether or not I/O is currently in progress.
1158  **/
1159 gboolean
1160 soup_message_io_in_progress (SoupMessage *msg)
1161 {
1162         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1163
1164         return priv->io_data != NULL;
1165 }