SoupConnection: track reusability internally
[platform/upstream/libsoup.git] / libsoup / soup-message-io.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-message-io.c: HTTP message I/O
4  *
5  * Copyright (C) 2000-2003, Ximian, Inc.
6  */
7
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11
12 #include <glib/gi18n-lib.h>
13
14 #include "soup.h"
15 #include "soup-body-input-stream.h"
16 #include "soup-body-output-stream.h"
17 #include "soup-client-input-stream.h"
18 #include "soup-connection.h"
19 #include "soup-content-processor.h"
20 #include "soup-content-sniffer-stream.h"
21 #include "soup-filter-input-stream.h"
22 #include "soup-message-private.h"
23 #include "soup-message-queue.h"
24 #include "soup-misc-private.h"
25
26 typedef enum {
27         SOUP_MESSAGE_IO_CLIENT,
28         SOUP_MESSAGE_IO_SERVER
29 } SoupMessageIOMode;
30
31 typedef enum {
32         SOUP_MESSAGE_IO_STATE_NOT_STARTED,
33         SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
34         SOUP_MESSAGE_IO_STATE_HEADERS,
35         SOUP_MESSAGE_IO_STATE_BLOCKING,
36         SOUP_MESSAGE_IO_STATE_BODY_START,
37         SOUP_MESSAGE_IO_STATE_BODY,
38         SOUP_MESSAGE_IO_STATE_BODY_DATA,
39         SOUP_MESSAGE_IO_STATE_BODY_DONE,
40         SOUP_MESSAGE_IO_STATE_FINISHING,
41         SOUP_MESSAGE_IO_STATE_DONE
42 } SoupMessageIOState;
43
44 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
45         (state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
46          state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
47          state != SOUP_MESSAGE_IO_STATE_DONE)
48 #define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
49         (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
50          state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
51
52 typedef struct {
53         SoupMessageQueueItem *item;
54         SoupMessageIOMode     mode;
55         GCancellable         *cancellable;
56
57         GIOStream              *iostream;
58         SoupFilterInputStream  *istream;
59         GInputStream           *body_istream;
60         GOutputStream          *ostream;
61         GOutputStream          *body_ostream;
62         GMainContext           *async_context;
63         gboolean                blocking;
64
65         SoupMessageIOState    read_state;
66         SoupEncoding          read_encoding;
67         GByteArray           *read_header_buf;
68         SoupMessageBody      *read_body;
69         goffset               read_length;
70
71         SoupMessageIOState    write_state;
72         SoupEncoding          write_encoding;
73         GString              *write_buf;
74         SoupMessageBody      *write_body;
75         SoupBuffer           *write_chunk;
76         goffset               write_body_offset;
77         goffset               write_length;
78         goffset               written;
79
80         GSource *io_source;
81         GSource *unpause_source;
82         gboolean paused;
83
84         SoupMessageGetHeadersFn   get_headers_cb;
85         SoupMessageParseHeadersFn parse_headers_cb;
86         gpointer                  header_data;
87         SoupMessageCompletionFn   completion_cb;
88         gpointer                  completion_data;
89 } SoupMessageIOData;
90         
91
92 #define RESPONSE_BLOCK_SIZE 8192
93
94 void
95 soup_message_io_cleanup (SoupMessage *msg)
96 {
97         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
98         SoupMessageIOData *io;
99
100         soup_message_io_stop (msg);
101
102         io = priv->io_data;
103         if (!io)
104                 return;
105         priv->io_data = NULL;
106
107         if (io->iostream)
108                 g_object_unref (io->iostream);
109         if (io->body_istream)
110                 g_object_unref (io->body_istream);
111         if (io->body_ostream)
112                 g_object_unref (io->body_ostream);
113         if (io->async_context)
114                 g_main_context_unref (io->async_context);
115         if (io->item)
116                 soup_message_queue_item_unref (io->item);
117
118         g_byte_array_free (io->read_header_buf, TRUE);
119
120         g_string_free (io->write_buf, TRUE);
121         if (io->write_chunk)
122                 soup_buffer_free (io->write_chunk);
123
124         g_slice_free (SoupMessageIOData, io);
125 }
126
127 void
128 soup_message_io_stop (SoupMessage *msg)
129 {
130         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
131         SoupMessageIOData *io = priv->io_data;
132
133         if (!io)
134                 return;
135
136         if (io->io_source) {
137                 g_source_destroy (io->io_source);
138                 g_source_unref (io->io_source);
139                 io->io_source = NULL;
140         }
141
142         if (io->unpause_source) {
143                 g_source_destroy (io->unpause_source);
144                 g_source_unref (io->unpause_source);
145                 io->unpause_source = NULL;
146         }
147
148         if (io->mode == SOUP_MESSAGE_IO_SERVER) {
149                 if (io->write_state < SOUP_MESSAGE_IO_STATE_FINISHING)
150                         g_io_stream_close (io->iostream, NULL, NULL);
151         }
152 }
153
154 void
155 soup_message_io_finished (SoupMessage *msg)
156 {
157         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
158         SoupMessageIOData *io = priv->io_data;
159         SoupMessageCompletionFn completion_cb = io->completion_cb;
160         gpointer completion_data = io->completion_data;
161
162         g_object_ref (msg);
163         soup_message_io_cleanup (msg);
164         if (completion_cb)
165                 completion_cb (msg, completion_data);
166         g_object_unref (msg);
167 }
168
169 static gboolean
170 read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
171 {
172         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
173         SoupMessageIOData *io = priv->io_data;
174         gssize nread, old_len;
175         gboolean got_lf;
176
177         while (1) {
178                 old_len = io->read_header_buf->len;
179                 g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
180                 nread = soup_filter_input_stream_read_line (io->istream,
181                                                             io->read_header_buf->data + old_len,
182                                                             RESPONSE_BLOCK_SIZE,
183                                                             io->blocking,
184                                                             &got_lf,
185                                                             cancellable, error);
186                 io->read_header_buf->len = old_len + MAX (nread, 0);
187                 if (nread == 0) {
188                         soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
189                         g_set_error_literal (error, G_IO_ERROR,
190                                              G_IO_ERROR_PARTIAL_INPUT,
191                                              _("Connection terminated unexpectedly"));
192                 }
193                 if (nread <= 0)
194                         return FALSE;
195
196                 if (got_lf) {
197                         if (nread == 1 && old_len >= 2 &&
198                             !strncmp ((char *)io->read_header_buf->data +
199                                       io->read_header_buf->len - 2,
200                                       "\n\n", 2))
201                                 break;
202                         else if (nread == 2 && old_len >= 3 &&
203                                  !strncmp ((char *)io->read_header_buf->data +
204                                            io->read_header_buf->len - 3,
205                                            "\n\r\n", 3))
206                                 break;
207                 }
208         }
209
210         /* We need to "rewind" io->read_header_buf back one line.
211          * That SHOULD be two characters (CR LF), but if the
212          * web server was stupid, it might only be one.
213          */
214         if (io->read_header_buf->len < 3 ||
215             io->read_header_buf->data[io->read_header_buf->len - 2] == '\n')
216                 io->read_header_buf->len--;
217         else
218                 io->read_header_buf->len -= 2;
219         io->read_header_buf->data[io->read_header_buf->len] = '\0';
220
221         return TRUE;
222 }
223
224 static gint
225 processing_stage_cmp (gconstpointer a,
226                     gconstpointer b)
227 {
228         SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (a));
229         SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (b));
230
231         if (stage_a > stage_b)
232                 return 1;
233         if (stage_a == stage_b)
234                 return 0;
235         return -1;
236 }
237
238 GInputStream *
239 soup_message_setup_body_istream (GInputStream *body_stream,
240                                  SoupMessage *msg,
241                                  SoupSession *session,
242                                  SoupProcessingStage start_at_stage)
243 {
244         GInputStream *istream;
245         GSList *p, *processors;
246
247         istream = g_object_ref (body_stream);
248
249         processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR);
250         processors = g_slist_sort (processors, processing_stage_cmp);
251
252         for (p = processors; p; p = p->next) {
253                 GInputStream *wrapper;
254                 SoupContentProcessor *processor;
255
256                 processor = SOUP_CONTENT_PROCESSOR (p->data);
257                 if (soup_message_disables_feature (msg, p->data) ||
258                     soup_content_processor_get_processing_stage (processor) < start_at_stage)
259                         continue;
260
261                 wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL);
262                 if (wrapper) {
263                         g_object_unref (istream);
264                         istream = wrapper;
265                 }
266         }
267
268         g_slist_free (processors);
269
270         return istream;
271 }
272
273 /*
274  * There are two request/response formats: the basic request/response,
275  * possibly with one or more unsolicited informational responses (such
276  * as the WebDAV "102 Processing" response):
277  *
278  *     Client                            Server
279  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
280  *      W:BODY     / R:NOT_STARTED    ->  R:BODY     / W:NOT_STARTED
281  *     [W:DONE     / R:HEADERS (1xx)  <-  R:DONE     / W:HEADERS (1xx) ...]
282  *      W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS
283  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
284  *      W:DONE     / R:DONE               R:DONE     / W:DONE
285  *     
286  * and the "Expect: 100-continue" request/response, with the client
287  * blocking halfway through its request, and then either continuing or
288  * aborting, depending on the server response:
289  *
290  *     Client                            Server
291  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
292  *      W:BLOCKING / R:HEADERS        <-  R:BLOCKING / W:HEADERS
293  *     [W:BODY     / R:BLOCKING       ->  R:BODY     / W:BLOCKING]
294  *     [W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS]
295  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
296  *      W:DONE     / R:DONE               R:DONE     / W:DONE
297  */
298
299 /* Attempts to push forward the writing side of @msg's I/O. Returns
300  * %TRUE if it manages to make some progress, and it is likely that
301  * further progress can be made. Returns %FALSE if it has reached a
302  * stopping point of some sort (need input from the application,
303  * socket not writable, write is complete, etc).
304  */
305 static gboolean
306 io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
307 {
308         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
309         SoupMessageIOData *io = priv->io_data;
310         SoupBuffer *chunk;
311         gssize nwrote;
312
313         switch (io->write_state) {
314         case SOUP_MESSAGE_IO_STATE_HEADERS:
315                 if (!io->write_buf->len) {
316                         io->get_headers_cb (msg, io->write_buf,
317                                             &io->write_encoding,
318                                             io->header_data);
319                 }
320
321                 while (io->written < io->write_buf->len) {
322                         nwrote = g_pollable_stream_write (io->ostream,
323                                                           io->write_buf->str + io->written,
324                                                           io->write_buf->len - io->written,
325                                                           io->blocking,
326                                                           cancellable, error);
327                         if (nwrote == -1)
328                                 return FALSE;
329                         io->written += nwrote;
330                 }
331
332                 io->written = 0;
333                 g_string_truncate (io->write_buf, 0);
334
335                 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
336                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
337                         if (msg->status_code == SOUP_STATUS_CONTINUE) {
338                                 /* Stop and wait for the body now */
339                                 io->write_state =
340                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
341                                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
342                         } else {
343                                 /* We just wrote a 1xx response
344                                  * header, so stay in STATE_HEADERS.
345                                  * (The caller will pause us from the
346                                  * wrote_informational callback if he
347                                  * is not ready to send the final
348                                  * response.)
349                                  */
350                         }
351
352                         soup_message_wrote_informational (msg);
353                         soup_message_cleanup_response (msg);
354                         break;
355                 }
356
357                 if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
358                         SoupMessageHeaders *hdrs =
359                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
360                                 msg->request_headers : msg->response_headers;
361                         io->write_length = soup_message_headers_get_content_length (hdrs);
362                 }
363
364                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
365                     soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
366                         /* Need to wait for the Continue response */
367                         io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
368                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
369                 } else {
370                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
371
372                         /* If the client was waiting for a Continue
373                          * but we sent something else, then they're
374                          * now done writing.
375                          */
376                         if (io->mode == SOUP_MESSAGE_IO_SERVER &&
377                             io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
378                                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
379                 }
380
381                 soup_message_wrote_headers (msg);
382                 break;
383
384
385         case SOUP_MESSAGE_IO_STATE_BODY_START:
386                 io->body_ostream = soup_body_output_stream_new (io->ostream,
387                                                                 io->write_encoding,
388                                                                 io->write_length);
389                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
390                 break;
391
392
393         case SOUP_MESSAGE_IO_STATE_BODY:
394                 if (!io->write_length &&
395                     io->write_encoding != SOUP_ENCODING_EOF &&
396                     io->write_encoding != SOUP_ENCODING_CHUNKED) {
397                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
398                         break;
399                 }
400
401                 if (!io->write_chunk) {
402                         io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
403                         if (!io->write_chunk) {
404                                 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
405                                 soup_message_io_pause (msg);
406                                 return FALSE;
407                         }
408                         if (!io->write_chunk->length) {
409                                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
410                                 break;
411                         }
412                 }
413
414                 nwrote = g_pollable_stream_write (io->body_ostream,
415                                                   io->write_chunk->data + io->written,
416                                                   io->write_chunk->length - io->written,
417                                                   io->blocking,
418                                                   cancellable, error);
419                 if (nwrote == -1)
420                         return FALSE;
421
422                 chunk = soup_buffer_new_subbuffer (io->write_chunk,
423                                                    io->written, nwrote);
424                 io->written += nwrote;
425                 if (io->write_length)
426                         io->write_length -= nwrote;
427
428                 if (io->written == io->write_chunk->length)
429                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
430
431                 soup_message_wrote_body_data (msg, chunk);
432                 soup_buffer_free (chunk);
433                 break;
434
435
436         case SOUP_MESSAGE_IO_STATE_BODY_DATA:
437                 io->written = 0;
438                 if (io->write_chunk->length == 0) {
439                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
440                         break;
441                 }
442
443                 if (io->mode == SOUP_MESSAGE_IO_SERVER ||
444                     priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
445                         soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
446                 io->write_body_offset += io->write_chunk->length;
447                 soup_buffer_free (io->write_chunk);
448                 io->write_chunk = NULL;
449
450                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
451                 soup_message_wrote_chunk (msg);
452                 break;
453
454
455         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
456                 if (io->body_ostream) {
457                         if (!g_output_stream_close (io->body_ostream, cancellable, error))
458                                 return FALSE;
459                         g_clear_object (&io->body_ostream);
460                 }
461
462                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
463                 soup_message_wrote_body (msg);
464                 break;
465
466
467         case SOUP_MESSAGE_IO_STATE_FINISHING:
468                 io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
469
470                 if (io->mode == SOUP_MESSAGE_IO_CLIENT)
471                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
472                 break;
473
474
475         default:
476                 g_return_val_if_reached (FALSE);
477         }
478
479         return TRUE;
480 }
481
482 /* Attempts to push forward the reading side of @msg's I/O. Returns
483  * %TRUE if it manages to make some progress, and it is likely that
484  * further progress can be made. Returns %FALSE if it has reached a
485  * stopping point of some sort (need input from the application,
486  * socket not readable, read is complete, etc).
487  */
488 static gboolean
489 io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
490 {
491         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
492         SoupMessageIOData *io = priv->io_data;
493         guchar *stack_buf = NULL;
494         gssize nread;
495         SoupBuffer *buffer;
496         guint status;
497
498         switch (io->read_state) {
499         case SOUP_MESSAGE_IO_STATE_HEADERS:
500                 if (!read_headers (msg, cancellable, error))
501                         return FALSE;
502
503                 status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
504                                                io->read_header_buf->len,
505                                                &io->read_encoding,
506                                                io->header_data);
507                 g_byte_array_set_size (io->read_header_buf, 0);
508
509                 if (status != SOUP_STATUS_OK) {
510                         /* Either we couldn't parse the headers, or they
511                          * indicated something that would mean we wouldn't
512                          * be able to parse the body. (Eg, unknown
513                          * Transfer-Encoding.). Skip the rest of the
514                          * reading, and make sure the connection gets
515                          * closed when we're done.
516                          */
517                         soup_message_set_status (msg, status);
518                         soup_message_headers_append (msg->request_headers,
519                                                      "Connection", "close");
520                         io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
521                         break;
522                 }
523
524                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
525                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
526                         if (msg->status_code == SOUP_STATUS_CONTINUE &&
527                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
528                                 /* Pause the reader, unpause the writer */
529                                 io->read_state =
530                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
531                                 io->write_state =
532                                         SOUP_MESSAGE_IO_STATE_BODY_START;
533                         } else {
534                                 /* Just stay in HEADERS */
535                                 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
536                         }
537
538                         /* Informational responses have no bodies, so
539                          * bail out here rather than parsing encoding, etc
540                          */
541                         soup_message_got_informational (msg);
542                         soup_message_cleanup_response (msg);
543                         break;
544                 } else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
545                            soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
546                         /* The client requested a Continue response. The
547                          * got_headers handler may change this to something
548                          * else though.
549                          */
550                         soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
551                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
552                         io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
553                 } else {
554                         io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
555
556                         /* If the client was waiting for a Continue
557                          * but got something else, then it's done
558                          * writing.
559                          */
560                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
561                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
562                                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
563                 }
564
565                 if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
566                         SoupMessageHeaders *hdrs =
567                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
568                                 msg->response_headers : msg->request_headers;
569                         io->read_length = soup_message_headers_get_content_length (hdrs);
570
571                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
572                             !soup_message_is_keepalive (msg)) {
573                                 /* Some servers suck and send
574                                  * incorrect Content-Length values, so
575                                  * allow EOF termination in this case
576                                  * (iff the message is too short) too.
577                                  */
578                                 io->read_encoding = SOUP_ENCODING_EOF;
579                         }
580                 } else
581                         io->read_length = -1;
582
583                 soup_message_got_headers (msg);
584                 break;
585
586
587         case SOUP_MESSAGE_IO_STATE_BODY_START:
588                 if (!io->body_istream) {
589                         GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream),
590                                                                                  io->read_encoding,
591                                                                                  io->read_length);
592
593                         /* TODO: server-side messages do not have a io->item. This means
594                          * that we cannot use content processors for them right now.
595                          */
596                         if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
597                                 io->body_istream = soup_message_setup_body_istream (body_istream, msg,
598                                                                                     io->item->session,
599                                                                                     SOUP_STAGE_MESSAGE_BODY);
600                                 g_object_unref (body_istream);
601                         } else {
602                                 io->body_istream = body_istream;
603                         }
604                 }
605
606                 if (priv->sniffer) {
607                         SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
608                         const char *content_type;
609                         GHashTable *params;
610
611                         if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable, error))
612                                 return FALSE;
613
614                         content_type = soup_content_sniffer_stream_sniff (sniffer_stream, &params);
615                         soup_message_content_sniffed (msg, content_type, params);
616                 }
617
618                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
619                 break;
620
621
622         case SOUP_MESSAGE_IO_STATE_BODY:
623                 if (priv->chunk_allocator) {
624                         buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
625                         if (!buffer) {
626                                 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
627                                 soup_message_io_pause (msg);
628                                 return FALSE;
629                         }
630                 } else {
631                         if (!stack_buf)
632                                 stack_buf = alloca (RESPONSE_BLOCK_SIZE);
633                         buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
634                                                   stack_buf,
635                                                   RESPONSE_BLOCK_SIZE);
636                 }
637
638                 nread = g_pollable_stream_read (io->body_istream,
639                                                 (guchar *)buffer->data,
640                                                 buffer->length,
641                                                 io->blocking,
642                                                 cancellable, error);
643                 if (nread > 0) {
644                         buffer->length = nread;
645                         soup_message_body_got_chunk (io->read_body, buffer);
646                         soup_message_got_chunk (msg, buffer);
647                         soup_buffer_free (buffer);
648                         break;
649                 }
650
651                 soup_buffer_free (buffer);
652                 if (nread == -1)
653                         return FALSE;
654
655                 /* else nread == 0 */
656                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
657                 break;
658
659
660         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
661                 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
662                 soup_message_got_body (msg);
663                 break;
664
665
666         case SOUP_MESSAGE_IO_STATE_FINISHING:
667                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
668
669                 if (io->mode == SOUP_MESSAGE_IO_SERVER)
670                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
671                 break;
672
673
674         default:
675                 g_return_val_if_reached (FALSE);
676         }
677
678         return TRUE;
679 }
680
681 typedef struct {
682         GSource source;
683         SoupMessage *msg;
684         gboolean paused;
685 } SoupMessageSource;
686
687 static gboolean
688 message_source_check (GSource *source)
689 {
690         SoupMessageSource *message_source = (SoupMessageSource *)source;
691
692         if (message_source->paused) {
693                 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (message_source->msg);
694                 SoupMessageIOData *io = priv->io_data;
695
696                 if (!io || io->paused)
697                         return FALSE;
698                 else
699                         return TRUE;
700         } else
701                 return FALSE;
702 }
703
704 static gboolean
705 message_source_prepare (GSource *source,
706                         gint    *timeout)
707 {
708         *timeout = -1;
709         return message_source_check (source);
710 }
711
712 static gboolean
713 message_source_dispatch (GSource     *source,
714                          GSourceFunc  callback,
715                          gpointer     user_data)
716 {
717         SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
718         SoupMessageSource *message_source = (SoupMessageSource *)source;
719
720         return (*func) (message_source->msg, user_data);
721 }
722
723 static void
724 message_source_finalize (GSource *source)
725 {
726         SoupMessageSource *message_source = (SoupMessageSource *)source;
727
728         g_object_unref (message_source->msg);
729 }
730
731 static gboolean
732 message_source_closure_callback (SoupMessage *msg,
733                                  gpointer     data)
734 {
735         GClosure *closure = data;
736         GValue param = G_VALUE_INIT;
737         GValue result_value = G_VALUE_INIT;
738         gboolean result;
739
740         g_value_init (&result_value, G_TYPE_BOOLEAN);
741
742         g_value_init (&param, SOUP_TYPE_MESSAGE);
743         g_value_set_object (&param, msg);
744
745         g_closure_invoke (closure, &result_value, 1, &param, NULL);
746
747         result = g_value_get_boolean (&result_value);
748         g_value_unset (&result_value);
749         g_value_unset (&param);
750
751         return result;
752 }
753
754 static GSourceFuncs message_source_funcs =
755 {
756         message_source_prepare,
757         message_source_check,
758         message_source_dispatch,
759         message_source_finalize,
760         (GSourceFunc)message_source_closure_callback,
761         (GSourceDummyMarshal)g_cclosure_marshal_generic,
762 };
763
764 GSource *
765 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
766                             SoupMessageSourceFunc callback, gpointer user_data)
767 {
768         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
769         SoupMessageIOData *io = priv->io_data;
770         GSource *base_source, *source;
771         SoupMessageSource *message_source;
772
773         if (!io) {
774                 base_source = g_timeout_source_new (0);
775         } else if (io->paused) {
776                 base_source = NULL;
777         } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
778                 GPollableInputStream *istream;
779
780                 if (io->body_istream)
781                         istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
782                 else
783                         istream = G_POLLABLE_INPUT_STREAM (io->istream);
784                 base_source = g_pollable_input_stream_create_source (istream, cancellable);
785         } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
786                 GPollableOutputStream *ostream;
787
788                 if (io->body_ostream)
789                         ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
790                 else
791                         ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
792                 base_source = g_pollable_output_stream_create_source (ostream, cancellable);
793         } else
794                 base_source = g_timeout_source_new (0);
795
796         source = g_source_new (&message_source_funcs,
797                                sizeof (SoupMessageSource));
798         g_source_set_name (source, "SoupMessageSource");
799         message_source = (SoupMessageSource *)source;
800         message_source->msg = g_object_ref (msg);
801         message_source->paused = io && io->paused;
802
803         if (base_source) {
804                 g_source_set_dummy_callback (base_source);
805                 g_source_add_child_source (source, base_source);
806                 g_source_unref (base_source);
807         }
808         g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
809         return source;
810 }
811
812 static gboolean
813 request_is_restartable (SoupMessage *msg, GError *error)
814 {
815         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
816         SoupMessageIOData *io = priv->io_data;
817
818         if (!io)
819                 return FALSE;
820
821         return (io->mode == SOUP_MESSAGE_IO_CLIENT &&
822                 io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
823                 io->read_header_buf->len == 0 &&
824                 soup_connection_get_ever_used (io->item->conn) &&
825                 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
826                 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) &&
827                 error->domain != G_TLS_ERROR &&
828                 SOUP_METHOD_IS_IDEMPOTENT (msg->method));
829 }
830
831 static gboolean
832 io_run_until (SoupMessage *msg,
833               SoupMessageIOState read_state, SoupMessageIOState write_state,
834               GCancellable *cancellable, GError **error)
835 {
836         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
837         SoupMessageIOData *io = priv->io_data;
838         gboolean progress = TRUE, done;
839         GError *my_error = NULL;
840
841         if (g_cancellable_set_error_if_cancelled (cancellable, error))
842                 return FALSE;
843         else if (!io) {
844                 g_set_error_literal (error, G_IO_ERROR,
845                                      G_IO_ERROR_CANCELLED,
846                                      _("Operation was cancelled"));
847                 return FALSE;
848         }
849
850         g_object_ref (msg);
851
852         while (progress && priv->io_data == io && !io->paused &&
853                (io->read_state < read_state || io->write_state < write_state)) {
854
855                 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
856                         progress = io_read (msg, cancellable, &my_error);
857                 else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
858                         progress = io_write (msg, cancellable, &my_error);
859                 else
860                         progress = FALSE;
861         }
862
863         if (my_error) {
864                 if (request_is_restartable (msg, my_error)) {
865                         /* Connection got closed, but we can safely try again */
866                         g_error_free (my_error);
867                         g_set_error_literal (error, SOUP_HTTP_ERROR,
868                                              SOUP_STATUS_TRY_AGAIN, "");
869                         g_object_unref (msg);
870                         return FALSE;
871                 }
872
873                 g_propagate_error (error, my_error);
874                 g_object_unref (msg);
875                 return FALSE;
876         } else if (g_cancellable_set_error_if_cancelled (cancellable, error)) {
877                 g_object_unref (msg);
878                 return FALSE;
879         } else if (priv->io_data != io) {
880                 g_set_error_literal (error, G_IO_ERROR,
881                                      G_IO_ERROR_CANCELLED,
882                                      _("Operation was cancelled"));
883                 g_object_unref (msg);
884                 return FALSE;
885         }
886
887         done = (io->read_state >= read_state &&
888                 io->write_state >= write_state);
889
890         if (io->paused && !done) {
891                 g_set_error_literal (error, G_IO_ERROR,
892                                      G_IO_ERROR_WOULD_BLOCK,
893                                      _("Operation would block"));
894                 g_object_unref (msg);
895                 return FALSE;
896         }
897
898         g_object_unref (msg);
899         return done;
900 }
901
902 static gboolean
903 io_run (SoupMessage *msg, gpointer user_data)
904 {
905         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
906         SoupMessageIOData *io = priv->io_data;
907         GError *error = NULL;
908         GCancellable *cancellable;
909
910         if (io->io_source) {
911                 g_source_destroy (io->io_source);
912                 g_source_unref (io->io_source);
913                 io->io_source = NULL;
914         }
915
916         g_object_ref (msg);
917         cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
918
919         if (io_run_until (msg,
920                           SOUP_MESSAGE_IO_STATE_DONE,
921                           SOUP_MESSAGE_IO_STATE_DONE,
922                           cancellable, &error)) {
923                 soup_message_io_finished (msg);
924         } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
925                 g_clear_error (&error);
926                 io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
927                 g_source_attach (io->io_source, io->async_context);
928         } else if (error && priv->io_data == io) {
929                 if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
930                         io->item->state = SOUP_MESSAGE_RESTARTING;
931                 else if (error->domain == G_TLS_ERROR) {
932                         soup_message_set_status_full (msg,
933                                                       SOUP_STATUS_SSL_FAILED,
934                                                       error->message);
935                 } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
936                         soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
937
938                 g_error_free (error);
939                 soup_message_io_finished (msg);
940         } else if (error)
941                 g_error_free (error);
942
943         g_object_unref (msg);
944         g_clear_object (&cancellable);
945
946         return FALSE;
947 }
948
949 gboolean
950 soup_message_io_run_until_write (SoupMessage *msg,
951                                  GCancellable *cancellable, GError **error)
952 {
953         return io_run_until (msg,
954                              SOUP_MESSAGE_IO_STATE_ANY,
955                              SOUP_MESSAGE_IO_STATE_BODY,
956                              cancellable, error);
957 }
958
959 gboolean
960 soup_message_io_run_until_read (SoupMessage *msg,
961                                 GCancellable *cancellable, GError **error)
962 {
963         return io_run_until (msg,
964                              SOUP_MESSAGE_IO_STATE_BODY,
965                              SOUP_MESSAGE_IO_STATE_ANY,
966                              cancellable, error);
967 }
968
969 gboolean
970 soup_message_io_run_until_finish (SoupMessage   *msg,
971                                   GCancellable  *cancellable,
972                                   GError       **error)
973 {
974         g_object_ref (msg);
975
976         if (!io_run_until (msg,
977                            SOUP_MESSAGE_IO_STATE_DONE,
978                            SOUP_MESSAGE_IO_STATE_DONE,
979                            cancellable, error)) {
980                 g_object_unref (msg);
981                 return FALSE;
982         }
983
984         soup_message_io_finished (msg);
985         g_object_unref (msg);
986         return TRUE;
987 }
988
989 static void
990 client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
991 {
992         SoupMessage *msg = user_data;
993         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
994         SoupMessageIOData *io = priv->io_data;
995
996         if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
997                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
998 }
999
1000 GInputStream *
1001 soup_message_io_get_response_istream (SoupMessage  *msg,
1002                                       GError      **error)
1003 {
1004         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1005         SoupMessageIOData *io = priv->io_data;
1006         GInputStream *client_stream;
1007
1008         g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
1009
1010         if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
1011                 g_set_error_literal (error, SOUP_HTTP_ERROR,
1012                                      msg->status_code, msg->reason_phrase);
1013                 return NULL;
1014         }
1015
1016         client_stream = soup_client_input_stream_new (io->body_istream, msg);
1017         g_signal_connect (client_stream, "eof",
1018                           G_CALLBACK (client_stream_eof), msg);
1019
1020         return client_stream;
1021 }
1022
1023
1024 static SoupMessageIOData *
1025 new_iostate (SoupMessage *msg, GIOStream *iostream,
1026              GMainContext *async_context, SoupMessageIOMode mode,
1027              SoupMessageGetHeadersFn get_headers_cb,
1028              SoupMessageParseHeadersFn parse_headers_cb,
1029              gpointer header_data,
1030              SoupMessageCompletionFn completion_cb,
1031              gpointer completion_data)
1032 {
1033         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1034         SoupMessageIOData *io;
1035
1036         io = g_slice_new0 (SoupMessageIOData);
1037         io->mode = mode;
1038         io->get_headers_cb   = get_headers_cb;
1039         io->parse_headers_cb = parse_headers_cb;
1040         io->header_data      = header_data;
1041         io->completion_cb    = completion_cb;
1042         io->completion_data  = completion_data;
1043
1044         io->iostream = g_object_ref (iostream);
1045         io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
1046         io->ostream = g_io_stream_get_output_stream (iostream);
1047
1048         if (async_context) {
1049                 io->async_context = g_main_context_ref (async_context);
1050                 io->blocking = FALSE;
1051         } else
1052                 io->blocking = TRUE;
1053
1054         io->read_header_buf = g_byte_array_new ();
1055         io->write_buf       = g_string_new (NULL);
1056
1057         io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1058         io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1059
1060         if (priv->io_data)
1061                 soup_message_io_cleanup (msg);
1062         priv->io_data = io;
1063         return io;
1064 }
1065
1066 void
1067 soup_message_io_client (SoupMessageQueueItem *item,
1068                         GIOStream *iostream,
1069                         GMainContext *async_context,
1070                         SoupMessageGetHeadersFn get_headers_cb,
1071                         SoupMessageParseHeadersFn parse_headers_cb,
1072                         gpointer header_data,
1073                         SoupMessageCompletionFn completion_cb,
1074                         gpointer completion_data)
1075 {
1076         SoupMessageIOData *io;
1077
1078         io = new_iostate (item->msg, iostream, async_context,
1079                           SOUP_MESSAGE_IO_CLIENT,
1080                           get_headers_cb, parse_headers_cb, header_data,
1081                           completion_cb, completion_data);
1082
1083         io->item = item;
1084         soup_message_queue_item_ref (item);
1085         io->cancellable = item->cancellable;
1086
1087         io->read_body       = item->msg->response_body;
1088         io->write_body      = item->msg->request_body;
1089
1090         io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
1091         if (!item->new_api)
1092                 io_run (item->msg, NULL);
1093 }
1094
1095 void
1096 soup_message_io_server (SoupMessage *msg,
1097                         GIOStream *iostream, GMainContext *async_context,
1098                         SoupMessageGetHeadersFn get_headers_cb,
1099                         SoupMessageParseHeadersFn parse_headers_cb,
1100                         gpointer header_data,
1101                         SoupMessageCompletionFn completion_cb,
1102                         gpointer completion_data)
1103 {
1104         SoupMessageIOData *io;
1105
1106         io = new_iostate (msg, iostream, async_context,
1107                           SOUP_MESSAGE_IO_SERVER,
1108                           get_headers_cb, parse_headers_cb, header_data,
1109                           completion_cb, completion_data);
1110
1111         io->read_body       = msg->request_body;
1112         io->write_body      = msg->response_body;
1113
1114         io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
1115         io_run (msg, NULL);
1116 }
1117
1118 void  
1119 soup_message_io_pause (SoupMessage *msg)
1120 {
1121         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1122         SoupMessageIOData *io = priv->io_data;
1123
1124         g_return_if_fail (io != NULL);
1125
1126         if (io->item && io->item->new_api)
1127                 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1128
1129         if (io->io_source) {
1130                 g_source_destroy (io->io_source);
1131                 g_source_unref (io->io_source);
1132                 io->io_source = NULL;
1133         }
1134
1135         if (io->unpause_source) {
1136                 g_source_destroy (io->unpause_source);
1137                 io->unpause_source = NULL;
1138         }
1139
1140         io->paused = TRUE;
1141 }
1142
1143 static gboolean
1144 io_unpause_internal (gpointer msg)
1145 {
1146         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1147         SoupMessageIOData *io = priv->io_data;
1148
1149         g_return_val_if_fail (io != NULL, FALSE);
1150         io->unpause_source = NULL;
1151         io->paused = FALSE;
1152
1153         if (io->io_source)
1154                 return FALSE;
1155
1156         io_run (msg, NULL);
1157         return FALSE;
1158 }
1159
1160 void
1161 soup_message_io_unpause (SoupMessage *msg)
1162 {
1163         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1164         SoupMessageIOData *io = priv->io_data;
1165
1166         g_return_if_fail (io != NULL);
1167
1168         if (io->item && io->item->new_api) {
1169                 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1170                 io->paused = FALSE;
1171                 return;
1172         }
1173
1174         if (!io->blocking) {
1175                 if (!io->unpause_source) {
1176                         io->unpause_source = soup_add_completion (
1177                                 io->async_context, io_unpause_internal, msg);
1178                 }
1179         } else
1180                 io_unpause_internal (msg);
1181 }
1182
1183 /**
1184  * soup_message_io_in_progress:
1185  * @msg: a #SoupMessage
1186  *
1187  * Tests whether or not I/O is currently in progress on @msg.
1188  *
1189  * Return value: whether or not I/O is currently in progress.
1190  **/
1191 gboolean
1192 soup_message_io_in_progress (SoupMessage *msg)
1193 {
1194         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1195
1196         return priv->io_data != NULL;
1197 }