1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
5 * Copyright (C) 2000-2003, Ximian, Inc.
12 #define LIBSOUP_I_HAVE_READ_BUG_594377_AND_KNOW_SOUP_PASSWORD_MANAGER_MIGHT_GO_AWAY
14 #include "soup-address.h"
15 #include "soup-session-async.h"
16 #include "soup-session-private.h"
17 #include "soup-address.h"
18 #include "soup-message-private.h"
19 #include "soup-message-queue.h"
20 #include "soup-misc.h"
21 #include "soup-password-manager.h"
25 * SECTION:soup-session-async
26 * @short_description: Soup session for asynchronous (main-loop-based) I/O.
28 * #SoupSessionAsync is an implementation of #SoupSession that uses
29 * non-blocking I/O via the glib main loop. It is intended for use in
30 * single-threaded programs.
33 static void run_queue (SoupSessionAsync *sa);
34 static void do_idle_run_queue (SoupSession *session);
36 static void send_request_running (SoupSession *session, SoupMessageQueueItem *item);
37 static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item);
38 static void send_request_finished (SoupSession *session, SoupMessageQueueItem *item);
40 static void queue_message (SoupSession *session, SoupMessage *req,
41 SoupSessionCallback callback, gpointer user_data);
42 static guint send_message (SoupSession *session, SoupMessage *req);
43 static void cancel_message (SoupSession *session, SoupMessage *msg,
45 static void kick (SoupSession *session);
47 static void auth_required (SoupSession *session, SoupMessage *msg,
48 SoupAuth *auth, gboolean retrying);
50 G_DEFINE_TYPE (SoupSessionAsync, soup_session_async, SOUP_TYPE_SESSION)
56 } SoupSessionAsyncPrivate;
57 #define SOUP_SESSION_ASYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_ASYNC, SoupSessionAsyncPrivate))
60 soup_session_async_init (SoupSessionAsync *sa)
62 SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa);
68 dispose (GObject *object)
70 SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (object);
72 priv->disposed = TRUE;
74 G_OBJECT_CLASS (soup_session_async_parent_class)->dispose (object);
78 soup_session_async_class_init (SoupSessionAsyncClass *soup_session_async_class)
80 SoupSessionClass *session_class = SOUP_SESSION_CLASS (soup_session_async_class);
81 GObjectClass *object_class = G_OBJECT_CLASS (session_class);
83 g_type_class_add_private (soup_session_async_class,
84 sizeof (SoupSessionAsyncPrivate));
86 /* virtual method override */
87 session_class->queue_message = queue_message;
88 session_class->send_message = send_message;
89 session_class->cancel_message = cancel_message;
90 session_class->auth_required = auth_required;
91 session_class->kick = kick;
93 object_class->dispose = dispose;
98 * soup_session_async_new:
100 * Creates an asynchronous #SoupSession with the default options.
102 * Return value: the new session.
105 soup_session_async_new (void)
107 return g_object_new (SOUP_TYPE_SESSION_ASYNC, NULL);
111 * soup_session_async_new_with_options:
112 * @optname1: name of first property to set
113 * @...: value of @optname1, followed by additional property/value pairs
115 * Creates an asynchronous #SoupSession with the specified options.
117 * Return value: the new session.
120 soup_session_async_new_with_options (const char *optname1, ...)
122 SoupSession *session;
125 va_start (ap, optname1);
126 session = (SoupSession *)g_object_new_valist (SOUP_TYPE_SESSION_ASYNC,
134 connection_closed (SoupConnection *conn, gpointer session)
136 /* Run the queue in case anyone was waiting for a connection
139 do_idle_run_queue (session);
143 message_completed (SoupMessage *msg, gpointer user_data)
145 SoupMessageQueueItem *item = user_data;
147 do_idle_run_queue (item->session);
149 if (item->state != SOUP_MESSAGE_RESTARTING)
150 item->state = SOUP_MESSAGE_FINISHING;
154 tunnel_complete (SoupMessageQueueItem *item)
156 SoupSession *session = item->session;
158 soup_message_finished (item->msg);
159 if (item->related->msg->status_code)
160 item->related->state = SOUP_MESSAGE_FINISHING;
162 soup_message_set_https_status (item->related->msg, item->conn);
164 do_idle_run_queue (session);
165 soup_message_queue_item_unref (item->related);
166 soup_session_unqueue_item (session, item);
167 soup_message_queue_item_unref (item);
168 g_object_unref (session);
172 ssl_tunnel_completed (SoupConnection *conn, guint status, gpointer user_data)
174 SoupMessageQueueItem *item = user_data;
176 if (SOUP_STATUS_IS_SUCCESSFUL (status)) {
177 g_signal_connect (item->conn, "disconnected",
178 G_CALLBACK (connection_closed), item->session);
179 soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
180 soup_connection_set_state (item->conn, SOUP_CONNECTION_IN_USE);
182 item->related->state = SOUP_MESSAGE_READY;
185 soup_connection_disconnect (item->conn);
186 soup_message_set_status (item->related->msg, SOUP_STATUS_SSL_FAILED);
189 tunnel_complete (item);
193 tunnel_message_completed (SoupMessage *msg, gpointer user_data)
195 SoupMessageQueueItem *item = user_data;
196 SoupSession *session = item->session;
198 if (item->state == SOUP_MESSAGE_RESTARTING) {
199 soup_message_restarted (msg);
201 item->state = SOUP_MESSAGE_RUNNING;
202 soup_session_send_queue_item (session, item, tunnel_message_completed);
206 soup_message_set_status (msg, SOUP_STATUS_TRY_AGAIN);
209 item->state = SOUP_MESSAGE_FINISHED;
211 if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
213 soup_connection_disconnect (item->conn);
214 if (msg->status_code == SOUP_STATUS_TRY_AGAIN) {
215 item->related->state = SOUP_MESSAGE_AWAITING_CONNECTION;
216 soup_message_queue_item_set_connection (item->related, NULL);
218 soup_message_set_status (item->related->msg, msg->status_code);
220 tunnel_complete (item);
224 soup_connection_start_ssl_async (item->conn, item->cancellable,
225 ssl_tunnel_completed, item);
229 got_connection (SoupConnection *conn, guint status, gpointer user_data)
231 SoupMessageQueueItem *item = user_data;
232 SoupSession *session = item->session;
234 if (item->state != SOUP_MESSAGE_CONNECTING) {
235 soup_connection_disconnect (conn);
236 do_idle_run_queue (session);
237 soup_message_queue_item_unref (item);
238 g_object_unref (session);
242 soup_message_set_https_status (item->msg, conn);
244 if (status != SOUP_STATUS_OK) {
245 soup_connection_disconnect (conn);
247 if (status == SOUP_STATUS_TRY_AGAIN) {
248 soup_message_queue_item_set_connection (item, NULL);
249 item->state = SOUP_MESSAGE_AWAITING_CONNECTION;
251 soup_session_set_item_status (session, item, status);
252 item->state = SOUP_MESSAGE_FINISHING;
255 do_idle_run_queue (session);
256 soup_message_queue_item_unref (item);
257 g_object_unref (session);
261 if (soup_connection_is_tunnelled (conn)) {
262 SoupMessageQueueItem *tunnel_item;
264 item->state = SOUP_MESSAGE_TUNNELING;
266 tunnel_item = soup_session_make_connect_message (session, conn);
267 tunnel_item->related = item;
268 soup_session_send_queue_item (session, tunnel_item, tunnel_message_completed);
272 item->state = SOUP_MESSAGE_READY;
273 g_signal_connect (conn, "disconnected",
274 G_CALLBACK (connection_closed), session);
275 run_queue ((SoupSessionAsync *)session);
276 soup_message_queue_item_unref (item);
277 g_object_unref (session);
281 process_queue_item (SoupMessageQueueItem *item,
282 gboolean *should_prune,
285 SoupSession *session = item->session;
287 if (item->async_context != soup_session_get_async_context (session))
294 switch (item->state) {
295 case SOUP_MESSAGE_STARTING:
296 item->state = SOUP_MESSAGE_AWAITING_CONNECTION;
299 case SOUP_MESSAGE_AWAITING_CONNECTION:
300 if (!soup_session_get_connection (session, item, should_prune))
303 if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
304 item->state = SOUP_MESSAGE_READY;
308 item->state = SOUP_MESSAGE_CONNECTING;
309 soup_message_queue_item_ref (item);
310 g_object_ref (session);
311 soup_connection_connect_async (item->conn, item->cancellable,
312 got_connection, item);
315 case SOUP_MESSAGE_READY:
316 item->state = SOUP_MESSAGE_RUNNING;
317 soup_session_send_queue_item (session, item, message_completed);
319 send_request_running (session, item);
322 case SOUP_MESSAGE_RESTARTING:
323 item->state = SOUP_MESSAGE_STARTING;
324 soup_message_restarted (item->msg);
326 send_request_restarted (session, item);
329 case SOUP_MESSAGE_FINISHING:
330 item->state = SOUP_MESSAGE_FINISHED;
331 soup_message_finished (item->msg);
332 if (item->state != SOUP_MESSAGE_FINISHED)
335 g_object_ref (session);
336 soup_session_unqueue_item (session, item);
338 item->callback (session, item->msg, item->callback_data);
339 else if (item->new_api)
340 send_request_finished (session, item);
341 g_object_unref (item->msg);
342 do_idle_run_queue (session);
343 g_object_unref (session);
347 /* Nothing to do with this message in any
352 } while (loop && item->state != SOUP_MESSAGE_FINISHED);
356 run_queue (SoupSessionAsync *sa)
358 SoupSession *session = SOUP_SESSION (sa);
359 SoupMessageQueue *queue = soup_session_get_queue (session);
360 SoupMessageQueueItem *item;
362 gboolean try_pruning = TRUE, should_prune = FALSE;
364 g_object_ref (session);
365 soup_session_cleanup_connections (session, FALSE);
368 for (item = soup_message_queue_first (queue);
370 item = soup_message_queue_next (queue, item)) {
373 /* CONNECT messages are handled specially */
374 if (msg->method != SOUP_METHOD_CONNECT)
375 process_queue_item (item, &should_prune, TRUE);
378 if (try_pruning && should_prune) {
379 /* There is at least one message in the queue that
380 * could be sent if we pruned an idle connection from
383 if (soup_session_cleanup_connections (session, TRUE)) {
384 try_pruning = should_prune = FALSE;
389 g_object_unref (session);
393 idle_run_queue (gpointer user_data)
395 SoupSessionAsyncPrivate *priv = user_data;
400 /* Ensure that the source is destroyed before running the queue */
401 g_source_destroy (g_main_current_source ());
403 run_queue (priv->sa);
408 do_idle_run_queue (SoupSession *session)
410 SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (session);
411 GMainContext *async_context = soup_session_get_async_context (session);
417 /* We use priv rather than session as the source data, because
418 * other parts of libsoup (or the calling app) may have sources
419 * using the session as the source data.
422 source = g_main_context_find_source_by_user_data (async_context, priv);
426 source = soup_add_completion (async_context, idle_run_queue, priv);
430 queue_message (SoupSession *session, SoupMessage *req,
431 SoupSessionCallback callback, gpointer user_data)
433 SOUP_SESSION_CLASS (soup_session_async_parent_class)->queue_message (session, req, callback, user_data);
435 do_idle_run_queue (session);
439 send_message (SoupSession *session, SoupMessage *req)
441 SoupMessageQueueItem *item;
442 GMainContext *async_context =
443 soup_session_get_async_context (session);
445 /* Balance out the unref that queuing will eventually do */
448 queue_message (session, req, NULL, NULL);
450 item = soup_message_queue_lookup (soup_session_get_queue (session), req);
451 g_return_val_if_fail (item != NULL, SOUP_STATUS_MALFORMED);
453 while (item->state != SOUP_MESSAGE_FINISHED)
454 g_main_context_iteration (async_context, TRUE);
456 soup_message_queue_item_unref (item);
458 return req->status_code;
462 cancel_message (SoupSession *session, SoupMessage *msg,
465 SoupMessageQueue *queue;
466 SoupMessageQueueItem *item;
469 SOUP_SESSION_CLASS (soup_session_async_parent_class)->
470 cancel_message (session, msg, status_code);
472 queue = soup_session_get_queue (session);
473 item = soup_message_queue_lookup (queue, msg);
477 /* Force it to finish immediately, so that
478 * soup_session_abort (session); g_object_unref (session);
479 * will work. (The soup_session_cancel_message() docs
480 * point out that the callback will be invoked from
481 * within the cancel call.)
483 if (soup_message_io_in_progress (msg))
484 soup_message_io_finished (msg);
485 else if (item->state != SOUP_MESSAGE_FINISHED)
486 item->state = SOUP_MESSAGE_FINISHING;
488 if (item->state != SOUP_MESSAGE_FINISHED)
489 process_queue_item (item, &dummy, FALSE);
491 soup_message_queue_item_unref (item);
495 got_passwords (SoupPasswordManager *password_manager, SoupMessage *msg,
496 SoupAuth *auth, gboolean retrying, gpointer session)
498 soup_session_unpause_message (session, msg);
499 SOUP_SESSION_CLASS (soup_session_async_parent_class)->
500 auth_required (session, msg, auth, retrying);
501 g_object_unref (auth);
505 auth_required (SoupSession *session, SoupMessage *msg,
506 SoupAuth *auth, gboolean retrying)
508 SoupSessionFeature *password_manager;
510 password_manager = soup_session_get_feature_for_message (
511 session, SOUP_TYPE_PASSWORD_MANAGER, msg);
512 if (password_manager) {
513 soup_session_pause_message (session, msg);
515 soup_password_manager_get_passwords_async (
516 SOUP_PASSWORD_MANAGER (password_manager),
518 soup_session_get_async_context (session),
519 NULL, /* FIXME cancellable */
520 got_passwords, session);
522 SOUP_SESSION_CLASS (soup_session_async_parent_class)->
523 auth_required (session, msg, auth, retrying);
528 kick (SoupSession *session)
530 do_idle_run_queue (session);
535 send_request_return_result (SoupMessageQueueItem *item,
536 gpointer stream, GError *error)
538 GSimpleAsyncResult *simple;
540 simple = item->result;
544 g_simple_async_result_take_error (simple, error);
545 else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
547 g_object_unref (stream);
548 g_simple_async_result_set_error (simple,
550 item->msg->status_code,
552 item->msg->reason_phrase);
554 g_simple_async_result_set_op_res_gpointer (simple, stream, g_object_unref);
556 g_simple_async_result_complete (simple);
557 g_object_unref (simple);
561 send_request_restarted (SoupSession *session, SoupMessageQueueItem *item)
563 /* We won't be needing this, then. */
564 g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL);
565 item->io_started = FALSE;
569 send_request_finished (SoupSession *session, SoupMessageQueueItem *item)
571 GMemoryOutputStream *mostream;
572 GInputStream *istream = NULL;
573 GError *error = NULL;
576 /* Something else already took care of it. */
580 mostream = g_object_get_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream");
585 /* We thought it would be requeued, but it wasn't, so
586 * return the original body.
588 size = g_memory_output_stream_get_data_size (mostream);
589 data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
590 istream = g_memory_input_stream_new_from_data (data, size, g_free);
591 } else if (item->io_started) {
592 /* The message finished before becoming readable. This
593 * will happen, eg, if it's cancelled from got-headers.
594 * Do nothing; the op will complete via read_ready_cb()
599 /* The message finished before even being started;
600 * probably a tunnel connect failure.
602 istream = g_memory_input_stream_new ();
605 send_request_return_result (item, istream, error);
609 send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
611 SoupMessageQueueItem *item = user_data;
612 GInputStream *istream = g_object_get_data (source, "istream");
614 GError *error = NULL;
616 /* If the message was cancelled, it will be completed via other means */
617 if (g_cancellable_is_cancelled (item->cancellable) ||
619 soup_message_queue_item_unref (item);
623 if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
624 result, &error) == -1) {
625 send_request_return_result (item, NULL, error);
629 /* Otherwise either restarted or finished will eventually be called.
630 * It should be safe to call the sync close() method here since
631 * the message body has already been written.
633 g_input_stream_close (istream, NULL, NULL);
634 do_idle_run_queue (item->session);
635 soup_message_queue_item_unref (item);
639 send_async_maybe_complete (SoupMessageQueueItem *item,
640 GInputStream *stream)
642 if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
643 item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
644 soup_session_would_redirect (item->session, item->msg)) {
645 GOutputStream *ostream;
647 /* Message may be requeued, so gather the current message body... */
648 ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
649 g_object_set_data_full (G_OBJECT (item->msg), "SoupSessionAsync:ostream",
650 ostream, g_object_unref);
652 g_object_set_data_full (G_OBJECT (ostream), "istream",
653 stream, g_object_unref);
655 /* Give the splice op its own ref on item */
656 soup_message_queue_item_ref (item);
657 g_output_stream_splice_async (ostream, stream,
658 /* We can't use CLOSE_SOURCE because it
659 * might get closed in the wrong thread.
661 G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
664 send_async_spliced, item);
668 send_request_return_result (item, stream, NULL);
671 static void try_run_until_read (SoupMessageQueueItem *item);
674 read_ready_cb (SoupMessage *msg, gpointer user_data)
676 SoupMessageQueueItem *item = user_data;
678 try_run_until_read (item);
683 try_run_until_read (SoupMessageQueueItem *item)
685 GError *error = NULL;
686 GInputStream *stream = NULL;
689 if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
690 stream = soup_message_io_get_response_istream (item->msg, &error);
692 send_async_maybe_complete (item, stream);
696 if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
697 send_request_return_result (item, NULL, error);
701 g_clear_error (&error);
702 source = soup_message_io_get_source (item->msg, item->cancellable,
703 read_ready_cb, item);
704 g_source_attach (source, soup_session_get_async_context (item->session));
705 g_source_unref (source);
709 send_request_running (SoupSession *session, SoupMessageQueueItem *item)
711 item->io_started = TRUE;
712 try_run_until_read (item);
716 soup_session_send_request_async (SoupSession *session,
718 GCancellable *cancellable,
719 GAsyncReadyCallback callback,
722 SoupMessageQueueItem *item;
723 gboolean use_thread_context;
725 g_return_if_fail (SOUP_IS_SESSION_ASYNC (session));
727 g_object_get (G_OBJECT (session),
728 SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
730 g_return_if_fail (use_thread_context);
732 /* Balance out the unref that queuing will eventually do */
735 queue_message (session, msg, NULL, NULL);
737 item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
738 g_return_if_fail (item != NULL);
740 item->new_api = TRUE;
741 item->result = g_simple_async_result_new (G_OBJECT (session),
743 soup_session_send_request_async);
744 g_simple_async_result_set_op_res_gpointer (item->result, item, (GDestroyNotify) soup_message_queue_item_unref);
747 g_object_unref (item->cancellable);
748 item->cancellable = g_object_ref (cancellable);
753 soup_session_send_request_finish (SoupSession *session,
754 GAsyncResult *result,
757 GSimpleAsyncResult *simple;
759 g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
760 g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (session), soup_session_send_request_async), NULL);
762 simple = G_SIMPLE_ASYNC_RESULT (result);
763 if (g_simple_async_result_propagate_error (simple, error))
765 return g_object_ref (g_simple_async_result_get_op_res_gpointer (simple));