SoupSession: allow creating a "plain" SoupSession for use with new APIs
authorDan Winship <danw@gnome.org>
Sun, 27 May 2012 15:42:45 +0000 (11:42 -0400)
committerDan Winship <danw@gnome.org>
Mon, 10 Dec 2012 16:14:56 +0000 (17:14 +0100)
In gio-based APIs, async vs sync is a function-level distinction, not
a class-level distinction. Merge most of the existing SoupSessionAsync
and SoupSessionSync code up into SoupSession, and make SoupSession
non-abstract, so that you can create a SoupSession and then use either
sync or async SoupRequest-based APIs on it. (The traditional APIs
still require one of the traditional subclasses, although the code
reorg does affect them in some ways, such as making SoupSessionAsync
more thread-safe.)

libsoup/soup-message-queue.h
libsoup/soup-session-async.c
libsoup/soup-session-private.h
libsoup/soup-session-sync.c
libsoup/soup-session.c
tests/requester-test.c

index d3341bd..dd61924 100644 (file)
@@ -46,7 +46,8 @@ struct _SoupMessageQueueItem {
        guint paused            : 1;
        guint new_api           : 1;
        guint io_started        : 1;
-       guint redirection_count : 29;
+       guint async             : 1;
+       guint redirection_count : 28;
 
        SoupMessageQueueItemState state;
 
index 99edf32..a24f4ba 100644 (file)
  * single-threaded programs.
  **/
 
-static void run_queue (SoupSessionAsync *sa);
-static void do_idle_run_queue (SoupSession *session);
-
-static void send_request_running   (SoupSession *session, SoupMessageQueueItem *item);
-static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item);
-static void send_request_finished  (SoupSession *session, SoupMessageQueueItem *item);
-
 G_DEFINE_TYPE (SoupSessionAsync, soup_session_async, SOUP_TYPE_SESSION)
 
-typedef struct {
-       SoupSessionAsync *sa;
-       GSList *sources;
-       gboolean disposed;
-
-} SoupSessionAsyncPrivate;
-#define SOUP_SESSION_ASYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_ASYNC, SoupSessionAsyncPrivate))
-
 static void
 soup_session_async_init (SoupSessionAsync *sa)
 {
-       SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa);
-
-       priv->sa = sa;
 }
 
-static void
-soup_session_async_dispose (GObject *object)
-{
-       SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (object);
-       GSList *iter;
-
-       priv->disposed = TRUE;
-       for (iter = priv->sources; iter; iter = iter->next) {
-               g_source_destroy (iter->data);
-               g_source_unref (iter->data);
-       }
-       g_clear_pointer (&priv->sources, g_slist_free);
-
-       G_OBJECT_CLASS (soup_session_async_parent_class)->dispose (object);
-}
-
-
 /**
  * soup_session_async_new:
  *
@@ -106,274 +71,15 @@ soup_session_async_new_with_options (const char *optname1, ...)
 }
 
 static void
-message_completed (SoupMessage *msg, gpointer user_data)
-{
-       SoupMessageQueueItem *item = user_data;
-
-       do_idle_run_queue (item->session);
-
-       if (item->state != SOUP_MESSAGE_RESTARTING)
-               item->state = SOUP_MESSAGE_FINISHING;
-}
-
-static void
-ssl_tunnel_completed (SoupConnection *conn, guint status, gpointer user_data)
-{
-       SoupMessageQueueItem *tunnel_item = user_data;
-       SoupMessageQueueItem *item = tunnel_item->related;
-       SoupSession *session = item->session;
-
-       soup_message_finished (tunnel_item->msg);
-       soup_message_queue_item_unref (tunnel_item);
-
-       if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
-               soup_session_set_item_connection (session, item, NULL);
-               soup_message_set_status (item->msg, status);
-       }
-
-       item->state = SOUP_MESSAGE_READY;
-       do_idle_run_queue (session);
-       soup_message_queue_item_unref (item);
-}
-
-static void
-tunnel_message_completed (SoupMessage *tunnel_msg, gpointer user_data)
-{
-       SoupMessageQueueItem *tunnel_item = user_data;
-       SoupSession *session = tunnel_item->session;
-       SoupMessageQueueItem *item = tunnel_item->related;
-
-       if (tunnel_item->state == SOUP_MESSAGE_RESTARTING) {
-               soup_message_restarted (tunnel_msg);
-               if (tunnel_item->conn) {
-                       tunnel_item->state = SOUP_MESSAGE_RUNNING;
-                       soup_session_send_queue_item (session, tunnel_item,
-                                                     tunnel_message_completed);
-                       return;
-               }
-
-               soup_message_set_status (tunnel_msg, SOUP_STATUS_TRY_AGAIN);
-       }
-
-       tunnel_item->state = SOUP_MESSAGE_FINISHED;
-       soup_session_unqueue_item (session, tunnel_item);
-
-       if (SOUP_STATUS_IS_SUCCESSFUL (tunnel_msg->status_code)) {
-               soup_connection_start_ssl_async (item->conn, item->cancellable,
-                                                ssl_tunnel_completed, tunnel_item);
-       } else {
-               ssl_tunnel_completed (item->conn, tunnel_msg->status_code,
-                                     tunnel_item);
-       }
-}
-
-static void
-got_connection (SoupConnection *conn, guint status, gpointer user_data)
-{
-       SoupMessageQueueItem *item = user_data;
-       SoupSession *session = item->session;
-
-       if (status != SOUP_STATUS_OK) {
-               if (item->state == SOUP_MESSAGE_CONNECTING) {
-                       soup_session_set_item_status (session, item, status);
-                       soup_session_set_item_connection (session, item, NULL);
-                       item->state = SOUP_MESSAGE_READY;
-               }
-       } else
-               item->state = SOUP_MESSAGE_CONNECTED;
-
-       run_queue ((SoupSessionAsync *)session);
-       soup_message_queue_item_unref (item);
-}
-
-static void
-process_queue_item (SoupMessageQueueItem *item,
-                   gboolean             *should_prune,
-                   gboolean              loop)
-{
-       SoupSession *session = item->session;
-
-       if (item->async_context != soup_session_get_async_context (session))
-               return;
-
-       do {
-               if (item->paused)
-                       return;
-
-               switch (item->state) {
-               case SOUP_MESSAGE_STARTING:
-                       if (!soup_session_get_connection (session, item, should_prune))
-                               return;
-
-                       if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
-                               item->state = SOUP_MESSAGE_READY;
-                               break;
-                       }
-
-                       item->state = SOUP_MESSAGE_CONNECTING;
-                       soup_message_queue_item_ref (item);
-                       soup_connection_connect_async (item->conn, item->cancellable,
-                                                      got_connection, item);
-                       return;
-
-               case SOUP_MESSAGE_CONNECTED:
-                       if (soup_connection_is_tunnelled (item->conn)) {
-                               SoupMessageQueueItem *tunnel_item;
-
-                               soup_message_queue_item_ref (item);
-
-                               item->state = SOUP_MESSAGE_TUNNELING;
-
-                               tunnel_item = soup_session_make_connect_message (session, item->conn);
-                               tunnel_item->related = item;
-                               soup_session_send_queue_item (session, tunnel_item, tunnel_message_completed);
-                               return;
-                       }
-
-                       item->state = SOUP_MESSAGE_READY;
-                       break;
-
-               case SOUP_MESSAGE_READY:
-                       soup_message_set_https_status (item->msg, item->conn);
-                       if (item->msg->status_code) {
-                               if (item->msg->status_code == SOUP_STATUS_TRY_AGAIN) {
-                                       soup_message_cleanup_response (item->msg);
-                                       item->state = SOUP_MESSAGE_STARTING;
-                               } else
-                                       item->state = SOUP_MESSAGE_FINISHING;
-                               break;
-                       }
-
-                       item->state = SOUP_MESSAGE_RUNNING;
-                       soup_session_send_queue_item (session, item, message_completed);
-                       if (item->new_api)
-                               send_request_running (session, item);
-                       break;
-
-               case SOUP_MESSAGE_RESTARTING:
-                       item->state = SOUP_MESSAGE_STARTING;
-                       soup_message_restarted (item->msg);
-                       if (item->new_api)
-                               send_request_restarted (session, item);
-                       break;
-
-               case SOUP_MESSAGE_FINISHING:
-                       item->state = SOUP_MESSAGE_FINISHED;
-                       soup_message_finished (item->msg);
-                       if (item->state != SOUP_MESSAGE_FINISHED) {
-                               g_return_if_fail (!item->new_api);
-                               break;
-                       }
-
-                       soup_message_queue_item_ref (item);
-                       soup_session_unqueue_item (session, item);
-                       if (item->callback)
-                               item->callback (session, item->msg, item->callback_data);
-                       else if (item->new_api)
-                               send_request_finished (session, item);
-
-                       soup_message_queue_item_unref (item);
-                       return;
-
-               default:
-                       /* Nothing to do with this message in any
-                        * other state.
-                        */
-                       return;
-               }
-       } while (loop && item->state != SOUP_MESSAGE_FINISHED);
-}
-
-static void
-run_queue (SoupSessionAsync *sa)
-{
-       SoupSession *session = SOUP_SESSION (sa);
-       SoupMessageQueue *queue = soup_session_get_queue (session);
-       SoupMessageQueueItem *item;
-       SoupMessage *msg;
-       gboolean try_pruning = TRUE, should_prune = FALSE;
-
-       g_object_ref (session);
-       soup_session_cleanup_connections (session, FALSE);
-
- try_again:
-       for (item = soup_message_queue_first (queue);
-            item;
-            item = soup_message_queue_next (queue, item)) {
-               msg = item->msg;
-
-               /* CONNECT messages are handled specially */
-               if (msg->method != SOUP_METHOD_CONNECT)
-                       process_queue_item (item, &should_prune, TRUE);
-       }
-
-       if (try_pruning && should_prune) {
-               /* There is at least one message in the queue that
-                * could be sent if we pruned an idle connection from
-                * some other server.
-                */
-               if (soup_session_cleanup_connections (session, TRUE)) {
-                       try_pruning = should_prune = FALSE;
-                       goto try_again;
-               }
-       }
-
-       g_object_unref (session);
-}
-
-static gboolean
-idle_run_queue (gpointer user_data)
-{
-       SoupSessionAsyncPrivate *priv = user_data;
-       GSource *source;
-
-       if (priv->disposed)
-               return FALSE;
-
-       source = g_main_current_source ();
-       priv->sources = g_slist_remove (priv->sources, source);
-
-       /* Ensure that the source is destroyed before running the queue */
-       g_source_destroy (source);
-       g_source_unref (source);
-
-       run_queue (priv->sa);
-       return FALSE;
-}
-
-static void
-do_idle_run_queue (SoupSession *session)
-{
-       SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (session);
-       GMainContext *async_context = soup_session_get_async_context (session);
-       GSource *source;
-
-       if (priv->disposed)
-               return;
-
-       /* We use priv rather than session as the source data, because
-        * other parts of libsoup (or the calling app) may have sources
-        * using the session as the source data.
-        */
-
-       source = g_main_context_find_source_by_user_data (async_context, priv);
-       if (source)
-               return;
-
-       source = soup_add_completion_reffed (async_context, idle_run_queue, priv);
-       priv->sources = g_slist_prepend (priv->sources, source);
-}
-
-static void
 soup_session_async_queue_message (SoupSession *session, SoupMessage *req,
                                  SoupSessionCallback callback, gpointer user_data)
 {
        SoupMessageQueueItem *item;
 
-       item = soup_session_append_queue_item (session, req, callback, user_data);
+       item = soup_session_append_queue_item (session, req, TRUE, FALSE,
+                                              callback, user_data);
+       soup_session_kick_queue (session);
        soup_message_queue_item_unref (item);
-
-       do_idle_run_queue (session);
 }
 
 static guint
@@ -383,10 +89,9 @@ soup_session_async_send_message (SoupSession *session, SoupMessage *req)
        GMainContext *async_context =
                soup_session_get_async_context (session);
 
-       soup_session_async_queue_message (session, req, NULL, NULL);
-
-       item = soup_message_queue_lookup (soup_session_get_queue (session), req);
-       g_return_val_if_fail (item != NULL, SOUP_STATUS_MALFORMED);
+       item = soup_session_append_queue_item (session, req, TRUE, FALSE,
+                                              NULL, NULL);
+       soup_session_kick_queue (session);
 
        while (item->state != SOUP_MESSAGE_FINISHED)
                g_main_context_iteration (async_context, TRUE);
@@ -402,7 +107,6 @@ soup_session_async_cancel_message (SoupSession *session, SoupMessage *msg,
 {
        SoupMessageQueue *queue;
        SoupMessageQueueItem *item;
-       gboolean dummy;
 
        SOUP_SESSION_CLASS (soup_session_async_parent_class)->
                cancel_message (session, msg, status_code);
@@ -424,7 +128,7 @@ soup_session_async_cancel_message (SoupSession *session, SoupMessage *msg,
                item->state = SOUP_MESSAGE_FINISHING;
 
        if (item->state != SOUP_MESSAGE_FINISHED)
-               process_queue_item (item, &dummy, FALSE);
+               soup_session_process_queue_item (session, item, NULL, FALSE);
 
        soup_message_queue_item_unref (item);
 }
@@ -463,268 +167,13 @@ soup_session_async_auth_required (SoupSession *session, SoupMessage *msg,
 }
 
 static void
-soup_session_async_kick (SoupSession *session)
-{
-       do_idle_run_queue (session);
-}
-
-
-static void
-send_request_return_result (SoupMessageQueueItem *item,
-                           gpointer stream, GError *error)
-{
-       GTask *task;
-
-       task = item->task;
-       item->task = NULL;
-
-       if (item->io_source) {
-               g_source_destroy (item->io_source);
-               g_clear_pointer (&item->io_source, g_source_unref);
-       }
-
-       if (error)
-               g_task_return_error (task, error);
-       else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
-               if (stream)
-                       g_object_unref (stream);
-               g_task_return_new_error (task, SOUP_HTTP_ERROR,
-                                        item->msg->status_code,
-                                        "%s",
-                                        item->msg->reason_phrase);
-       } else
-               g_task_return_pointer (task, stream, g_object_unref);
-       g_object_unref (task);
-}
-
-static void
-send_request_restarted (SoupSession *session, SoupMessageQueueItem *item)
-{
-       /* We won't be needing this, then. */
-       g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL);
-       item->io_started = FALSE;
-}
-
-static void
-send_request_finished (SoupSession *session, SoupMessageQueueItem *item)
-{
-       GMemoryOutputStream *mostream;
-       GInputStream *istream = NULL;
-       GError *error = NULL;
-
-       if (!item->task) {
-               /* Something else already took care of it. */
-               return;
-       }
-
-       mostream = g_object_get_data (G_OBJECT (item->task), "SoupSessionAsync:ostream");
-       if (mostream) {
-               gpointer data;
-               gssize size;
-
-               /* We thought it would be requeued, but it wasn't, so
-                * return the original body.
-                */
-               size = g_memory_output_stream_get_data_size (mostream);
-               data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
-               istream = g_memory_input_stream_new_from_data (data, size, g_free);
-       } else if (item->io_started) {
-               /* The message finished before becoming readable. This
-                * will happen, eg, if it's cancelled from got-headers.
-                * Do nothing; the op will complete via read_ready_cb()
-                * after we return;
-                */
-               return;
-       } else {
-               /* The message finished before even being started;
-                * probably a tunnel connect failure.
-                */
-               istream = g_memory_input_stream_new ();
-       }
-
-       send_request_return_result (item, istream, error);
-}
-
-static void
-send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
-{
-       SoupMessageQueueItem *item = user_data;
-       GInputStream *istream = g_object_get_data (source, "istream");
-       GError *error = NULL;
-
-       /* It should be safe to call the sync close() method here since
-        * the message body has already been written.
-        */
-       g_input_stream_close (istream, NULL, NULL);
-       g_object_unref (istream);
-
-       /* If the message was cancelled, it will be completed via other means */
-       if (g_cancellable_is_cancelled (item->cancellable) ||
-           !item->task) {
-               soup_message_queue_item_unref (item);
-               return;
-       }
-
-       if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
-                                          result, &error) == -1) {
-               send_request_return_result (item, NULL, error);
-               soup_message_queue_item_unref (item);
-               return;
-       }
-
-       /* Otherwise either restarted or finished will eventually be called. */
-       do_idle_run_queue (item->session);
-       soup_message_queue_item_unref (item);
-}
-
-static void
-send_async_maybe_complete (SoupMessageQueueItem *item,
-                          GInputStream         *stream)
-{
-       if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
-           item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
-           soup_session_would_redirect (item->session, item->msg)) {
-               GOutputStream *ostream;
-
-               /* Message may be requeued, so gather the current message body... */
-               ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
-               g_object_set_data_full (G_OBJECT (item->task), "SoupSessionAsync:ostream",
-                                       ostream, g_object_unref);
-
-               g_object_set_data (G_OBJECT (ostream), "istream", stream);
-
-               /* Give the splice op its own ref on item */
-               soup_message_queue_item_ref (item);
-               g_output_stream_splice_async (ostream, stream,
-                                             /* We can't use CLOSE_SOURCE because it
-                                              * might get closed in the wrong thread.
-                                              */
-                                             G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
-                                             G_PRIORITY_DEFAULT,
-                                             item->cancellable,
-                                             send_async_spliced, item);
-               return;
-       }
-
-       send_request_return_result (item, stream, NULL);
-}
-
-static void try_run_until_read (SoupMessageQueueItem *item);
-
-static gboolean
-read_ready_cb (SoupMessage *msg, gpointer user_data)
-{
-       SoupMessageQueueItem *item = user_data;
-
-       g_clear_pointer (&item->io_source, g_source_unref);
-       try_run_until_read (item);
-       return FALSE;
-}
-
-static void
-try_run_until_read (SoupMessageQueueItem *item)
-{
-       GError *error = NULL;
-       GInputStream *stream = NULL;
-
-       if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
-               stream = soup_message_io_get_response_istream (item->msg, &error);
-       if (stream) {
-               send_async_maybe_complete (item, stream);
-               return;
-       }
-
-       if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
-               item->state = SOUP_MESSAGE_RESTARTING;
-               soup_message_io_finished (item->msg);
-               g_error_free (error);
-               return;
-       }
-
-       if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-               if (item->state != SOUP_MESSAGE_FINISHED) {
-                       gboolean dummy;
-
-                       if (soup_message_io_in_progress (item->msg))
-                               soup_message_io_finished (item->msg);
-                       item->state = SOUP_MESSAGE_FINISHING;
-                       process_queue_item (item, &dummy, FALSE);
-               }
-               send_request_return_result (item, NULL, error);
-               return;
-       }
-
-       g_clear_error (&error);
-       item->io_source = soup_message_io_get_source (item->msg, item->cancellable,
-                                                     read_ready_cb, item);
-       g_source_attach (item->io_source, soup_session_get_async_context (item->session));
-}
-
-static void
-send_request_running (SoupSession *session, SoupMessageQueueItem *item)
-{
-       item->io_started = TRUE;
-       try_run_until_read (item);
-}
-
-void
-soup_session_send_request_async (SoupSession         *session,
-                                SoupMessage         *msg,
-                                GCancellable        *cancellable,
-                                GAsyncReadyCallback  callback,
-                                gpointer             user_data)
-{
-       SoupMessageQueueItem *item;
-       gboolean use_thread_context;
-
-       g_return_if_fail (SOUP_IS_SESSION_ASYNC (session));
-
-       g_object_get (G_OBJECT (session),
-                     SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
-                     NULL);
-       g_return_if_fail (use_thread_context);
-
-       soup_session_async_queue_message (session, msg, NULL, NULL);
-
-       item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
-       g_return_if_fail (item != NULL);
-
-       item->new_api = TRUE;
-       item->task = g_task_new (session, cancellable, callback, user_data);
-       g_task_set_task_data (item->task, item, (GDestroyNotify) soup_message_queue_item_unref);
-
-       if (cancellable) {
-               g_object_unref (item->cancellable);
-               item->cancellable = g_object_ref (cancellable);
-       }
-}
-
-GInputStream *
-soup_session_send_request_finish (SoupSession   *session,
-                                 GAsyncResult  *result,
-                                 GError       **error)
-{
-       g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
-       g_return_val_if_fail (g_task_is_valid (result, session), NULL);
-
-       return g_task_propagate_pointer (G_TASK (result), error);
-}
-
-static void
 soup_session_async_class_init (SoupSessionAsyncClass *soup_session_async_class)
 {
        SoupSessionClass *session_class = SOUP_SESSION_CLASS (soup_session_async_class);
-       GObjectClass *object_class = G_OBJECT_CLASS (session_class);
-
-       g_type_class_add_private (soup_session_async_class,
-                                 sizeof (SoupSessionAsyncPrivate));
 
        /* virtual method override */
        session_class->queue_message = soup_session_async_queue_message;
        session_class->send_message = soup_session_async_send_message;
        session_class->cancel_message = soup_session_async_cancel_message;
        session_class->auth_required = soup_session_async_auth_required;
-       session_class->kick = soup_session_async_kick;
-
-       object_class->dispose = soup_session_async_dispose;
 }
index 297faf5..dc4d300 100644 (file)
@@ -17,26 +17,12 @@ SoupMessageQueue     *soup_session_get_queue            (SoupSession          *s
 
 SoupMessageQueueItem *soup_session_append_queue_item    (SoupSession          *session,
                                                         SoupMessage          *msg,
+                                                        gboolean              async,
+                                                        gboolean              new_api,
                                                         SoupSessionCallback   callback,
                                                         gpointer              user_data);
-SoupMessageQueueItem *soup_session_make_connect_message (SoupSession          *session,
-                                                        SoupConnection       *conn);
-gboolean              soup_session_get_connection       (SoupSession          *session,
-                                                        SoupMessageQueueItem *item,
-                                                        gboolean             *try_pruning);
-gboolean              soup_session_cleanup_connections  (SoupSession          *session,
-                                                        gboolean              prune_idle);
-void                  soup_session_send_queue_item      (SoupSession          *session,
-                                                        SoupMessageQueueItem *item,
-                                                        SoupMessageCompletionFn completion_cb);
-void                  soup_session_unqueue_item         (SoupSession          *session,
-                                                        SoupMessageQueueItem *item);
-void                  soup_session_set_item_connection  (SoupSession          *session,
-                                                        SoupMessageQueueItem *item,
-                                                        SoupConnection       *conn);
-void                  soup_session_set_item_status      (SoupSession          *session,
-                                                        SoupMessageQueueItem *item,
-                                                        guint                 status_code);
+
+void                  soup_session_kick_queue           (SoupSession          *session);
 
 GInputStream         *soup_session_send_request         (SoupSession          *session,
                                                         SoupMessage          *msg,
@@ -52,6 +38,11 @@ GInputStream         *soup_session_send_request_finish  (SoupSession          *s
                                                         GAsyncResult         *result,
                                                         GError              **error);
 
+void                  soup_session_process_queue_item   (SoupSession          *session,
+                                                        SoupMessageQueueItem *item,
+                                                        gboolean             *should_prune,
+                                                        gboolean              loop);
+
 G_END_DECLS
 
 #endif /* SOUP_SESSION_PRIVATE_H */
index 43d0a49..cbd2460 100644 (file)
  * handler callbacks, until I/O is complete.
  **/
 
-typedef struct {
-       GMutex lock;
-       GCond cond;
-} SoupSessionSyncPrivate;
-#define SOUP_SESSION_SYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_SYNC, SoupSessionSyncPrivate))
-
 G_DEFINE_TYPE (SoupSessionSync, soup_session_sync, SOUP_TYPE_SESSION)
 
 static void
 soup_session_sync_init (SoupSessionSync *ss)
 {
-       SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (ss);
-
-       g_mutex_init (&priv->lock);
-       g_cond_init (&priv->cond);
-}
-
-static void
-soup_session_sync_finalize (GObject *object)
-{
-       SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (object);
-
-       g_mutex_clear (&priv->lock);
-       g_cond_clear (&priv->cond);
-
-       G_OBJECT_CLASS (soup_session_sync_parent_class)->finalize (object);
 }
 
-
 /**
  * soup_session_sync_new:
  *
@@ -107,181 +85,6 @@ soup_session_sync_new_with_options (const char *optname1, ...)
        return session;
 }
 
-static guint
-tunnel_connect (SoupSession *session, SoupMessageQueueItem *related)
-{
-       SoupConnection *conn = related->conn;
-       SoupMessageQueueItem *item;
-       guint status;
-
-       g_object_ref (conn);
-
-       item = soup_session_make_connect_message (session, conn);
-       do {
-               soup_session_send_queue_item (session, item, NULL);
-               status = item->msg->status_code;
-               if (item->state == SOUP_MESSAGE_RESTARTING &&
-                   soup_message_io_in_progress (item->msg)) {
-                       soup_message_restarted (item->msg);
-                       item->state = SOUP_MESSAGE_RUNNING;
-               } else {
-                       if (item->state == SOUP_MESSAGE_RESTARTING)
-                               status = SOUP_STATUS_TRY_AGAIN;
-                       item->state = SOUP_MESSAGE_FINISHED;
-                       soup_message_finished (item->msg);
-               }
-       } while (item->state == SOUP_MESSAGE_STARTING);
-       soup_session_unqueue_item (session, item);
-       soup_message_queue_item_unref (item);
-
-       if (SOUP_STATUS_IS_SUCCESSFUL (status)) {
-               if (!soup_connection_start_ssl_sync (conn, related->cancellable))
-                       status = SOUP_STATUS_SSL_FAILED;
-               soup_message_set_https_status (related->msg, conn);
-       }
-
-       g_object_unref (conn);
-       return status;
-}
-
-static void
-get_connection (SoupMessageQueueItem *item)
-{
-       SoupSession *session = item->session;
-       SoupMessage *msg = item->msg;
-       gboolean try_pruning = FALSE;
-       guint status;
-
-try_again:
-       soup_session_cleanup_connections (session, FALSE);
-
-       if (!soup_session_get_connection (session, item, &try_pruning)) {
-               if (!try_pruning)
-                       return;
-               soup_session_cleanup_connections (session, TRUE);
-               if (!soup_session_get_connection (session, item, &try_pruning))
-                       return;
-               try_pruning = FALSE;
-       }
-
-       if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_IDLE) {
-               item->state = SOUP_MESSAGE_READY;
-               return;
-       }
-
-       if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_NEW) {
-               status = soup_connection_connect_sync (item->conn, item->cancellable);
-               if (status == SOUP_STATUS_TRY_AGAIN) {
-                       soup_session_set_item_connection (session, item, NULL);
-                       goto try_again;
-               }
-
-               soup_message_set_https_status (msg, item->conn);
-
-               if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
-                       if (!msg->status_code)
-                               soup_session_set_item_status (session, item, status);
-                       item->state = SOUP_MESSAGE_FINISHING;
-                       soup_session_set_item_connection (session, item, NULL);
-                       return;
-               }
-       }
-
-       if (soup_connection_is_tunnelled (item->conn)) {
-               status = tunnel_connect (session, item);
-               if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
-                       soup_session_set_item_connection (session, item, NULL);
-                       if (status == SOUP_STATUS_TRY_AGAIN)
-                               goto try_again;
-                       soup_session_set_item_status (session, item, status);
-                       item->state = SOUP_MESSAGE_FINISHING;
-                       return;
-               }
-       }
-
-       item->state = SOUP_MESSAGE_READY;
-}
-
-static void process_queue_item (SoupMessageQueueItem *item);
-
-static void
-new_api_message_completed (SoupMessage *msg, gpointer user_data)
-{
-       SoupMessageQueueItem *item = user_data;
-
-       if (item->state != SOUP_MESSAGE_RESTARTING) {
-               item->state = SOUP_MESSAGE_FINISHING;
-               process_queue_item (item);
-       }
-}
-
-static void
-process_queue_item (SoupMessageQueueItem *item)
-{
-       SoupSession *session = item->session;
-       SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-
-       soup_message_queue_item_ref (item);
-
-       do {
-               if (item->paused) {
-                       g_mutex_lock (&priv->lock);
-                       while (item->paused)
-                               g_cond_wait (&priv->cond, &priv->lock);
-                       g_mutex_unlock (&priv->lock);
-               }
-
-               switch (item->state) {
-               case SOUP_MESSAGE_STARTING:
-                       g_mutex_lock (&priv->lock);
-                       do {
-                               get_connection (item);
-                               if (item->state == SOUP_MESSAGE_STARTING)
-                                       g_cond_wait (&priv->cond, &priv->lock);
-                       } while (item->state == SOUP_MESSAGE_STARTING);
-                       g_mutex_unlock (&priv->lock);
-                       break;
-
-               case SOUP_MESSAGE_READY:
-                       item->state = SOUP_MESSAGE_RUNNING;
-
-                       if (item->new_api) {
-                               soup_session_send_queue_item (item->session, item, new_api_message_completed);
-                               goto out;
-                       }
-
-                       soup_session_send_queue_item (item->session, item, NULL);
-                       if (item->state != SOUP_MESSAGE_RESTARTING)
-                               item->state = SOUP_MESSAGE_FINISHING;
-                       break;
-
-               case SOUP_MESSAGE_RUNNING:
-                       g_warn_if_fail (item->new_api);
-                       item->state = SOUP_MESSAGE_FINISHING;
-                       break;
-
-               case SOUP_MESSAGE_RESTARTING:
-                       item->state = SOUP_MESSAGE_STARTING;
-                       soup_message_restarted (item->msg);
-                       break;
-
-               case SOUP_MESSAGE_FINISHING:
-                       item->state = SOUP_MESSAGE_FINISHED;
-                       soup_message_finished (item->msg);
-                       soup_session_unqueue_item (session, item);
-                       break;
-
-               default:
-                       g_warn_if_reached ();
-                       item->state = SOUP_MESSAGE_FINISHING;
-                       break;
-               }
-       } while (item->state != SOUP_MESSAGE_FINISHED);
-
- out:
-       soup_message_queue_item_unref (item);
-}
-
 static gboolean
 queue_message_callback (gpointer data)
 {
@@ -297,7 +100,7 @@ queue_message_thread (gpointer data)
 {
        SoupMessageQueueItem *item = data;
 
-       process_queue_item (item);
+       soup_session_process_queue_item (item->session, item, NULL, TRUE);
        if (item->callback) {
                soup_add_completion (soup_session_get_async_context (item->session),
                                     queue_message_callback, item);
@@ -314,7 +117,8 @@ soup_session_sync_queue_message (SoupSession *session, SoupMessage *msg,
        SoupMessageQueueItem *item;
        GThread *thread;
 
-       item = soup_session_append_queue_item (session, msg, callback, user_data);
+       item = soup_session_append_queue_item (session, msg, FALSE, FALSE,
+                                              callback, user_data);
        thread = g_thread_new ("SoupSessionSync:queue_message",
                               queue_message_thread, item);
        g_thread_unref (thread);
@@ -326,25 +130,15 @@ soup_session_sync_send_message (SoupSession *session, SoupMessage *msg)
        SoupMessageQueueItem *item;
        guint status;
 
-       item = soup_session_append_queue_item (session, msg, NULL, NULL);
-       process_queue_item (item);
+       item = soup_session_append_queue_item (session, msg, FALSE, FALSE,
+                                              NULL, NULL);
+       soup_session_process_queue_item (session, item, NULL, TRUE);
        status = msg->status_code;
        soup_message_queue_item_unref (item);
        return status;
 }
 
 static void
-soup_session_sync_cancel_message (SoupSession *session, SoupMessage *msg, guint status_code)
-{
-       SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-
-       g_mutex_lock (&priv->lock);
-       SOUP_SESSION_CLASS (soup_session_sync_parent_class)->cancel_message (session, msg, status_code);
-       g_cond_broadcast (&priv->cond);
-       g_mutex_unlock (&priv->lock);
-}
-
-static void
 soup_session_sync_auth_required (SoupSession *session, SoupMessage *msg,
                                 SoupAuth *auth, gboolean retrying)
 {
@@ -363,182 +157,12 @@ soup_session_sync_auth_required (SoupSession *session, SoupMessage *msg,
 }
 
 static void
-soup_session_sync_flush_queue (SoupSession *session)
-{
-       SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-       SoupMessageQueue *queue;
-       SoupMessageQueueItem *item;
-       GHashTable *current;
-       gboolean done = FALSE;
-
-       /* Record the current contents of the queue */
-       current = g_hash_table_new (NULL, NULL);
-       queue = soup_session_get_queue (session);
-       for (item = soup_message_queue_first (queue);
-            item;
-            item = soup_message_queue_next (queue, item))
-               g_hash_table_insert (current, item, item);
-
-       /* Cancel everything */
-       SOUP_SESSION_CLASS (soup_session_sync_parent_class)->flush_queue (session);
-
-       /* Wait until all of the items in @current have been removed
-        * from the queue. (This is not the same as "wait for the
-        * queue to be empty", because the app may queue new requests
-        * in response to the cancellation of the old ones. We don't
-        * try to cancel those requests as well, since we'd likely
-        * just end up looping forever.)
-        */
-       g_mutex_lock (&priv->lock);
-       do {
-               done = TRUE;
-               for (item = soup_message_queue_first (queue);
-                    item;
-                    item = soup_message_queue_next (queue, item)) {
-                       if (g_hash_table_lookup (current, item))
-                               done = FALSE;
-               }
-
-               if (!done)
-                       g_cond_wait (&priv->cond, &priv->lock);
-       } while (!done);
-       g_mutex_unlock (&priv->lock);
-
-       g_hash_table_destroy (current);
-}
-
-static void
-soup_session_sync_kick (SoupSession *session)
-{
-       SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-
-       g_cond_broadcast (&priv->cond);
-}
-
-static void
 soup_session_sync_class_init (SoupSessionSyncClass *session_sync_class)
 {
-       GObjectClass *object_class = G_OBJECT_CLASS (session_sync_class);
        SoupSessionClass *session_class = SOUP_SESSION_CLASS (session_sync_class);
 
-       g_type_class_add_private (session_sync_class, sizeof (SoupSessionSyncPrivate));
-
        /* virtual method override */
        session_class->queue_message = soup_session_sync_queue_message;
        session_class->send_message = soup_session_sync_send_message;
-       session_class->cancel_message = soup_session_sync_cancel_message;
        session_class->auth_required = soup_session_sync_auth_required;
-       session_class->flush_queue = soup_session_sync_flush_queue;
-       session_class->kick = soup_session_sync_kick;
-
-       object_class->finalize = soup_session_sync_finalize;
-}
-
-
-GInputStream *
-soup_session_send_request (SoupSession   *session,
-                          SoupMessage   *msg,
-                          GCancellable  *cancellable,
-                          GError       **error)
-{
-       SoupMessageQueueItem *item;
-       GInputStream *stream = NULL;
-       GOutputStream *ostream;
-       GMemoryOutputStream *mostream;
-       gssize size;
-       GError *my_error = NULL;
-
-       g_return_val_if_fail (SOUP_IS_SESSION_SYNC (session), NULL);
-
-       item = soup_session_append_queue_item (session, msg, NULL, NULL);
-
-       item->new_api = TRUE;
-       if (cancellable) {
-               g_object_unref (item->cancellable);
-               item->cancellable = g_object_ref (cancellable);
-       }
-
-       while (!stream) {
-               /* Get a connection, etc */
-               process_queue_item (item);
-               if (item->state != SOUP_MESSAGE_RUNNING)
-                       break;
-
-               /* Send request, read headers */
-               if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) {
-                       if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
-                               item->state = SOUP_MESSAGE_RESTARTING;
-                               soup_message_io_finished (item->msg);
-                               g_clear_error (&my_error);
-                               continue;
-                       } else
-                               break;
-               }
-
-               stream = soup_message_io_get_response_istream (msg, &my_error);
-               if (!stream)
-                       break;
-
-               /* Break if the message doesn't look likely-to-be-requeued */
-               if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
-                   msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
-                   !soup_session_would_redirect (session, msg))
-                       break;
-
-               /* Gather the current message body... */
-               ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
-               if (g_output_stream_splice (ostream, stream,
-                                           G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
-                                           G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
-                                           item->cancellable, &my_error) == -1) {
-                       g_object_unref (stream);
-                       g_object_unref (ostream);
-                       stream = NULL;
-                       break;
-               }
-               g_object_unref (stream);
-               stream = NULL;
-
-               /* If the message was requeued, loop */
-               if (item->state == SOUP_MESSAGE_RESTARTING) {
-                       g_object_unref (ostream);
-                       continue;
-               }
-
-               /* Not requeued, so return the original body */
-               mostream = G_MEMORY_OUTPUT_STREAM (ostream);
-               size = g_memory_output_stream_get_data_size (mostream);
-               stream = g_memory_input_stream_new ();
-               if (size) {
-                       g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
-                                                       g_memory_output_stream_steal_data (mostream),
-                                                       size, g_free);
-               }
-               g_object_unref (ostream);
-       }
-
-       if (my_error)
-               g_propagate_error (error, my_error);
-       else if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
-               if (stream) {
-                       g_object_unref (stream);
-                       stream = NULL;
-               }
-               g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
-                                    msg->reason_phrase);
-       } else if (!stream)
-               stream = g_memory_input_stream_new ();
-
-       if (!stream) {
-               if (soup_message_io_in_progress (msg))
-                       soup_message_io_finished (msg);
-               else if (item->state != SOUP_MESSAGE_FINISHED)
-                       item->state = SOUP_MESSAGE_FINISHING;
-
-               if (item->state != SOUP_MESSAGE_FINISHED)
-                       process_queue_item (item);
-       }
-
-       soup_message_queue_item_unref (item);
-       return stream;
 }
index 3eecd06..f5d93d4 100644 (file)
@@ -77,6 +77,8 @@ static guint soup_host_uri_hash (gconstpointer key);
 static gboolean soup_host_uri_equal (gconstpointer v1, gconstpointer v2);
 
 typedef struct {
+       SoupSession *session;
+
        GTlsDatabase *tlsdb;
        char *ssl_ca_file;
        gboolean ssl_strict;
@@ -100,12 +102,15 @@ typedef struct {
         * SoupSessionHost, adding/removing a connection,
         * disconnecting a connection, or moving a connection from
         * IDLE to IN_USE. Must not emit signals or destroy objects
-        * while holding it.
+        * while holding it. conn_cond is signaled when it may be
+        * possible for a previously-blocked message to continue.
         */
        GMutex conn_lock;
+       GCond conn_cond;
 
        GMainContext *async_context;
        gboolean use_thread_context;
+       GSList *run_queue_sources;
 
        GResolver *resolver;
 
@@ -126,6 +131,10 @@ static void auth_manager_authenticate (SoupAuthManager *manager,
                                       SoupMessage *msg, SoupAuth *auth,
                                       gboolean retrying, gpointer user_data);
 
+static void async_run_queue (SoupSession *session);
+
+static void async_send_request_running (SoupSession *session, SoupMessageQueueItem *item);
+
 #define SOUP_SESSION_MAX_CONNS_DEFAULT 10
 #define SOUP_SESSION_MAX_CONNS_PER_HOST_DEFAULT 2
 
@@ -133,9 +142,9 @@ static void auth_manager_authenticate (SoupAuthManager *manager,
 
 #define SOUP_SESSION_USER_AGENT_BASE "libsoup/" PACKAGE_VERSION
 
-G_DEFINE_ABSTRACT_TYPE_WITH_CODE (SoupSession, soup_session, G_TYPE_OBJECT,
-                                 soup_init ();
-                                 )
+G_DEFINE_TYPE_WITH_CODE (SoupSession, soup_session, G_TYPE_OBJECT,
+                        soup_init ();
+                        )
 
 enum {
        REQUEST_QUEUED,
@@ -182,9 +191,12 @@ soup_session_init (SoupSession *session)
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
        SoupAuthManager *auth_manager;
 
+       priv->session = session;
+
        priv->queue = soup_message_queue_new (session);
 
        g_mutex_init (&priv->conn_lock);
+       g_cond_init (&priv->conn_cond);
        priv->http_hosts = g_hash_table_new_full (soup_host_uri_hash,
                                                  soup_host_uri_equal,
                                                  NULL, (GDestroyNotify)free_host);
@@ -225,6 +237,15 @@ soup_session_dispose (GObject *object)
 {
        SoupSession *session = SOUP_SESSION (object);
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       GSList *iter;
+
+       priv->disposed = TRUE;
+
+       for (iter = priv->run_queue_sources; iter; iter = iter->next) {
+               g_source_destroy (iter->data);
+               g_source_unref (iter->data);
+       }
+       g_clear_pointer (&priv->run_queue_sources, g_slist_free);
 
        priv->disposed = TRUE;
        soup_session_abort (session);
@@ -245,6 +266,7 @@ soup_session_finalize (GObject *object)
        soup_message_queue_destroy (priv->queue);
 
        g_mutex_clear (&priv->conn_lock);
+       g_cond_clear (&priv->conn_cond);
        g_hash_table_destroy (priv->http_hosts);
        g_hash_table_destroy (priv->https_hosts);
        g_hash_table_destroy (priv->conns);
@@ -826,10 +848,7 @@ get_host_for_uri (SoupSession *session, SoupURI *uri)
        return host;
 }
 
-/* Note: get_host_for_message doesn't lock the conn_lock. The caller
- * must do it itself if there's a chance the host doesn't already
- * exist.
- */
+/* Requires conn_lock to be locked */
 static SoupSessionHost *
 get_host_for_message (SoupSession *session, SoupMessage *msg)
 {
@@ -1031,6 +1050,36 @@ redirect_handler (SoupMessage *msg, gpointer user_data)
 }
 
 static void
+proxy_connection_event (SoupConnection      *conn,
+                       GSocketClientEvent   event,
+                       GIOStream           *connection,
+                       gpointer             user_data)
+{
+       SoupMessageQueueItem *item = user_data;
+
+       soup_message_network_event (item->msg, event, connection);
+}
+
+static void
+soup_session_set_item_connection (SoupSession          *session,
+                                 SoupMessageQueueItem *item,
+                                 SoupConnection       *conn)
+{
+       if (item->conn) {
+               g_signal_handlers_disconnect_by_func (item->conn, proxy_connection_event, item);
+               g_object_unref (item->conn);
+       }
+
+       item->conn = conn;
+
+       if (item->conn) {
+               g_object_ref (item->conn);
+               g_signal_connect (item->conn, "event",
+                                 G_CALLBACK (proxy_connection_event), item);
+       }
+}
+
+static void
 message_restarted (SoupMessage *msg, gpointer user_data)
 {
        SoupMessageQueueItem *item = user_data;
@@ -1048,6 +1097,7 @@ message_restarted (SoupMessage *msg, gpointer user_data)
 
 SoupMessageQueueItem *
 soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
+                               gboolean async, gboolean new_api,
                                SoupSessionCallback callback, gpointer user_data)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
@@ -1057,6 +1107,8 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
        soup_message_cleanup_response (msg);
 
        item = soup_message_queue_append (priv->queue, msg, callback, user_data);
+       item->async = async;
+       item->new_api = new_api;
 
        g_mutex_lock (&priv->conn_lock);
        host = get_host_for_message (session, item->msg);
@@ -1077,7 +1129,7 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
        return item;
 }
 
-void
+static void
 soup_session_send_queue_item (SoupSession *session,
                              SoupMessageQueueItem *item,
                              SoupMessageCompletionFn completion_cb)
@@ -1116,9 +1168,9 @@ soup_session_send_queue_item (SoupSession *session,
                soup_connection_send_request (item->conn, item, completion_cb, item);
 }
 
-gboolean
+static gboolean
 soup_session_cleanup_connections (SoupSession *session,
-                                 gboolean     prune_idle)
+                                 gboolean     cleanup_idle)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
        GSList *conns = NULL, *c;
@@ -1131,7 +1183,7 @@ soup_session_cleanup_connections (SoupSession *session,
        while (g_hash_table_iter_next (&iter, &conn, &host)) {
                state = soup_connection_get_state (conn);
                if (state == SOUP_CONNECTION_REMOTE_DISCONNECTED ||
-                   (prune_idle && state == SOUP_CONNECTION_IDLE)) {
+                   (cleanup_idle && state == SOUP_CONNECTION_IDLE)) {
                        conns = g_slist_prepend (conns, g_object_ref (conn));
                        g_hash_table_iter_remove (&iter);
                        drop_connection (session, host, conn);
@@ -1233,7 +1285,7 @@ connection_disconnected (SoupConnection *conn, gpointer user_data)
 
        g_mutex_unlock (&priv->conn_lock);
 
-       SOUP_SESSION_GET_CLASS (session)->kick (session);
+       soup_session_kick_queue (session);
 }
 
 static void
@@ -1243,88 +1295,281 @@ connection_state_changed (GObject *object, GParamSpec *param, gpointer user_data
        SoupConnection *conn = SOUP_CONNECTION (object);
 
        if (soup_connection_get_state (conn) == SOUP_CONNECTION_IDLE)
-               SOUP_SESSION_GET_CLASS (session)->kick (session);
+               soup_session_kick_queue (session);
 }
 
-SoupMessageQueueItem *
-soup_session_make_connect_message (SoupSession    *session,
-                                  SoupConnection *conn)
+SoupMessageQueue *
+soup_session_get_queue (SoupSession *session)
+{
+       SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+
+       return priv->queue;
+}
+
+static void
+soup_session_unqueue_item (SoupSession          *session,
+                          SoupMessageQueueItem *item)
+{
+       SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       SoupSessionHost *host;
+
+       if (item->conn) {
+               if (item->msg->method != SOUP_METHOD_CONNECT ||
+                   !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code))
+                       soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
+               soup_session_set_item_connection (session, item, NULL);
+       }
+
+       if (item->state != SOUP_MESSAGE_FINISHED) {
+               g_warning ("finished an item with state %d", item->state);
+               return;
+       }
+
+       soup_message_queue_remove (priv->queue, item);
+
+       g_mutex_lock (&priv->conn_lock);
+       host = get_host_for_message (session, item->msg);
+       host->num_messages--;
+       g_mutex_unlock (&priv->conn_lock);
+
+       /* g_signal_handlers_disconnect_by_func doesn't work if you
+        * have a metamarshal, meaning it doesn't work with
+        * soup_message_add_header_handler()
+        */
+       g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA,
+                                             0, 0, NULL, NULL, item);
+       g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, item->msg);
+       soup_message_queue_item_unref (item);
+}
+
+static void
+soup_session_set_item_status (SoupSession          *session,
+                             SoupMessageQueueItem *item,
+                             guint                 status_code)
+{
+       SoupURI *uri;
+       char *msg;
+
+       switch (status_code) {
+       case SOUP_STATUS_CANT_RESOLVE:
+       case SOUP_STATUS_CANT_CONNECT:
+               uri = soup_message_get_uri (item->msg);
+               msg = g_strdup_printf ("%s (%s)",
+                                      soup_status_get_phrase (status_code),
+                                      uri->host);
+               soup_message_set_status_full (item->msg, status_code, msg);
+               g_free (msg);
+               break;
+
+       case SOUP_STATUS_CANT_RESOLVE_PROXY:
+       case SOUP_STATUS_CANT_CONNECT_PROXY:
+               if (item->proxy_uri && item->proxy_uri->host) {
+                       msg = g_strdup_printf ("%s (%s)",
+                                              soup_status_get_phrase (status_code),
+                                              item->proxy_uri->host);
+                       soup_message_set_status_full (item->msg, status_code, msg);
+                       g_free (msg);
+                       break;
+               }
+               soup_message_set_status (item->msg, status_code);
+               break;
+
+       case SOUP_STATUS_SSL_FAILED:
+               if (!g_tls_backend_supports_tls (g_tls_backend_get_default ())) {
+                       soup_message_set_status_full (item->msg, status_code,
+                                                     "TLS/SSL support not available; install glib-networking");
+               } else
+                       soup_message_set_status (item->msg, status_code);
+               break;
+
+       default:
+               soup_message_set_status (item->msg, status_code);
+               break;
+       }
+}
+
+
+static void
+message_completed (SoupMessage *msg, gpointer user_data)
+{
+       SoupMessageQueueItem *item = user_data;
+
+       if (item->async)
+               soup_session_kick_queue (item->session);
+
+       if (item->state != SOUP_MESSAGE_RESTARTING) {
+               item->state = SOUP_MESSAGE_FINISHING;
+
+               if (item->new_api && !item->async)
+                       soup_session_process_queue_item (item->session, item, NULL, TRUE);
+       }
+}
+
+static void
+tunnel_complete (SoupConnection *conn, guint status, gpointer user_data)
+{
+       SoupMessageQueueItem *tunnel_item = user_data;
+       SoupMessageQueueItem *item = tunnel_item->related;
+       SoupSession *session = tunnel_item->session;
+
+       soup_message_finished (tunnel_item->msg);
+       soup_message_queue_item_unref (tunnel_item);
+
+       if (item->msg->status_code)
+               item->state = SOUP_MESSAGE_FINISHING;
+       else
+               soup_message_set_https_status (item->msg, item->conn);
+
+       if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
+               soup_session_set_item_connection (session, item, NULL);
+               soup_session_set_item_status (session, item, status);
+       }
+
+       item->state = SOUP_MESSAGE_READY;
+       if (item->async)
+               soup_session_kick_queue (session);
+       soup_message_queue_item_unref (item);
+}
+
+static void
+tunnel_message_completed (SoupMessage *msg, gpointer user_data)
+{
+       SoupMessageQueueItem *tunnel_item = user_data;
+       SoupMessageQueueItem *item = tunnel_item->related;
+       SoupSession *session = tunnel_item->session;
+       guint status;
+
+       if (tunnel_item->state == SOUP_MESSAGE_RESTARTING) {
+               soup_message_restarted (msg);
+               if (tunnel_item->conn) {
+                       tunnel_item->state = SOUP_MESSAGE_RUNNING;
+                       soup_session_send_queue_item (session, tunnel_item,
+                                                     tunnel_message_completed);
+                       return;
+               }
+
+               soup_message_set_status (msg, SOUP_STATUS_TRY_AGAIN);
+       }
+
+       tunnel_item->state = SOUP_MESSAGE_FINISHED;
+       soup_session_unqueue_item (session, tunnel_item);
+
+       status = tunnel_item->msg->status_code;
+       if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
+               tunnel_complete (item->conn, status, tunnel_item);
+               return;
+       }
+
+       if (tunnel_item->async) {
+               soup_connection_start_ssl_async (item->conn, item->cancellable,
+                                                tunnel_complete, tunnel_item);
+       } else {
+               status = soup_connection_start_ssl_sync (item->conn, item->cancellable);
+               tunnel_complete (item->conn, status, tunnel_item);
+       }
+}
+
+static void
+tunnel_connect (SoupMessageQueueItem *item)
 {
+       SoupSession *session = item->session;
+       SoupMessageQueueItem *tunnel_item;
        SoupURI *uri;
        SoupMessage *msg;
-       SoupMessageQueueItem *item;
 
-       uri = soup_connection_get_remote_uri (conn);
+       item->state = SOUP_MESSAGE_TUNNELING;
+
+       uri = soup_connection_get_remote_uri (item->conn);
        msg = soup_message_new_from_uri (SOUP_METHOD_CONNECT, uri);
        soup_message_set_flags (msg, SOUP_MESSAGE_NO_REDIRECT);
 
-       item = soup_session_append_queue_item (session, msg, NULL, NULL);
-       soup_session_set_item_connection (session, item, conn);
+       tunnel_item = soup_session_append_queue_item (session, msg,
+                                                     item->async, FALSE,
+                                                     NULL, NULL);
        g_object_unref (msg);
-       item->state = SOUP_MESSAGE_RUNNING;
+       tunnel_item->related = item;
+       soup_message_queue_item_ref (item);
+       soup_session_set_item_connection (session, tunnel_item, item->conn);
+       tunnel_item->state = SOUP_MESSAGE_RUNNING;
 
-       g_signal_emit (session, signals[TUNNELING], 0, conn);
-       return item;
+       g_signal_emit (session, signals[TUNNELING], 0, tunnel_item->conn);
+
+       soup_session_send_queue_item (session, tunnel_item,
+                                     tunnel_message_completed);
 }
 
-gboolean
-soup_session_get_connection (SoupSession *session,
-                            SoupMessageQueueItem *item,
-                            gboolean *try_pruning)
+static void
+got_connection (SoupConnection *conn, guint status, gpointer user_data)
+{
+       SoupMessageQueueItem *item = user_data;
+       SoupSession *session = item->session;
+
+       if (status != SOUP_STATUS_OK) {
+               if (item->state == SOUP_MESSAGE_CONNECTING) {
+                       soup_session_set_item_status (session, item, status);
+                       soup_session_set_item_connection (session, item, NULL);
+                       item->state = SOUP_MESSAGE_READY;
+               }
+       } else
+               item->state = SOUP_MESSAGE_CONNECTED;
+
+       if (item->async) {
+               if (item->state == SOUP_MESSAGE_CONNECTED ||
+                   item->state == SOUP_MESSAGE_READY)
+                       async_run_queue (item->session);
+               else
+                       soup_session_kick_queue (item->session);
+
+               soup_message_queue_item_unref (item);
+       }
+}
+
+/* requires conn_lock */
+static SoupConnection *
+get_connection_for_host (SoupSession *session,
+                        SoupMessageQueueItem *item,
+                        SoupSessionHost *host,
+                        gboolean need_new_connection,
+                        gboolean *try_cleanup)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
        SoupConnection *conn;
-       SoupSessionHost *host;
        GSList *conns;
        int num_pending = 0;
-       gboolean need_new_connection;
 
        if (priv->disposed)
                return FALSE;
 
        if (item->conn) {
                g_return_val_if_fail (soup_connection_get_state (item->conn) != SOUP_CONNECTION_DISCONNECTED, FALSE);
-               return TRUE;
+               return item->conn;
        }
 
-       need_new_connection =
-               (soup_message_get_flags (item->msg) & SOUP_MESSAGE_NEW_CONNECTION) ||
-               (!(soup_message_get_flags (item->msg) & SOUP_MESSAGE_IDEMPOTENT) &&
-                !SOUP_METHOD_IS_IDEMPOTENT (item->msg->method));
-
-       g_mutex_lock (&priv->conn_lock);
-
-       host = get_host_for_message (session, item->msg);
        for (conns = host->connections; conns; conns = conns->next) {
-               if (!need_new_connection && soup_connection_get_state (conns->data) == SOUP_CONNECTION_IDLE) {
-                       soup_connection_set_state (conns->data, SOUP_CONNECTION_IN_USE);
-                       g_mutex_unlock (&priv->conn_lock);
-                       soup_session_set_item_connection (session, item, conns->data);
-                       soup_message_set_https_status (item->msg, item->conn);
-                       return TRUE;
-               } else if (soup_connection_get_state (conns->data) == SOUP_CONNECTION_CONNECTING)
+               conn = conns->data;
+
+               if (!need_new_connection && soup_connection_get_state (conn) == SOUP_CONNECTION_IDLE) {
+                       soup_connection_set_state (conn, SOUP_CONNECTION_IN_USE);
+                       return conn;
+               } else if (soup_connection_get_state (conn) == SOUP_CONNECTION_CONNECTING)
                        num_pending++;
        }
 
        /* Limit the number of pending connections; num_messages / 2
         * is somewhat arbitrary...
         */
-       if (num_pending > host->num_messages / 2) {
-               g_mutex_unlock (&priv->conn_lock);
-               return FALSE;
-       }
+       if (num_pending > host->num_messages / 2)
+               return NULL;
 
        if (host->num_conns >= priv->max_conns_per_host) {
                if (need_new_connection)
-                       *try_pruning = TRUE;
-               g_mutex_unlock (&priv->conn_lock);
-               return FALSE;
+                       *try_cleanup = TRUE;
+               return NULL;
        }
 
        if (priv->num_conns >= priv->max_conns) {
-               *try_pruning = TRUE;
-               g_mutex_unlock (&priv->conn_lock);
-               return FALSE;
+               *try_cleanup = TRUE;
+               return NULL;
        }
 
        conn = g_object_new (
@@ -1347,6 +1592,10 @@ soup_session_get_connection (SoupSession *session,
                          G_CALLBACK (connection_state_changed),
                          session);
 
+       /* This is a debugging-related signal, and so can ignore the
+        * usual rule about not emitting signals while holding
+        * conn_lock.
+        */
        g_signal_emit (session, signals[CONNECTION_CREATED], 0, conn);
 
        g_hash_table_insert (priv->conns, conn, host);
@@ -1361,129 +1610,220 @@ soup_session_get_connection (SoupSession *session,
                host->keep_alive_src = NULL;
        }
 
-       g_mutex_unlock (&priv->conn_lock);
-       soup_session_set_item_connection (session, item, conn);
-       return TRUE;
+       return conn;
 }
 
-SoupMessageQueue *
-soup_session_get_queue (SoupSession *session)
+static gboolean
+get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup)
 {
+       SoupSession *session = item->session;
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       SoupSessionHost *host;
+       SoupConnection *conn = NULL;
+       gboolean my_should_cleanup = FALSE;
+       gboolean need_new_connection;
 
-       return priv->queue;
+       need_new_connection =
+               (soup_message_get_flags (item->msg) & SOUP_MESSAGE_NEW_CONNECTION) ||
+               (!(soup_message_get_flags (item->msg) & SOUP_MESSAGE_IDEMPOTENT) &&
+                !SOUP_METHOD_IS_IDEMPOTENT (item->msg->method));
+
+       g_mutex_lock (&priv->conn_lock);
+       host = get_host_for_message (session, item->msg);
+       while (TRUE) {
+               conn = get_connection_for_host (session, item, host,
+                                               need_new_connection,
+                                               &my_should_cleanup);
+               if (conn || item->async)
+                       break;
+
+               if (my_should_cleanup) {
+                       g_mutex_unlock (&priv->conn_lock);
+                       soup_session_cleanup_connections (session, TRUE);
+                       g_mutex_lock (&priv->conn_lock);
+
+                       my_should_cleanup = FALSE;
+                       continue;
+               }
+
+               g_cond_wait (&priv->conn_cond, &priv->conn_lock);
+       }
+       g_mutex_unlock (&priv->conn_lock);
+
+       if (!conn) {
+               if (should_cleanup)
+                       *should_cleanup = my_should_cleanup;
+               return FALSE;
+       }
+
+       soup_session_set_item_connection (session, item, conn);
+       soup_message_set_https_status (item->msg, item->conn);
+
+       if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
+               item->state = SOUP_MESSAGE_READY;
+               return TRUE;
+       }
+
+       item->state = SOUP_MESSAGE_CONNECTING;
+
+       if (item->async) {
+               soup_message_queue_item_ref (item);
+               soup_connection_connect_async (item->conn, item->cancellable,
+                                              got_connection, item);
+               return FALSE;
+       } else {
+               guint status;
+
+               status = soup_connection_connect_sync (item->conn, item->cancellable);
+               got_connection (item->conn, status, item);
+
+               return TRUE;
+       }
 }
 
 void
-soup_session_unqueue_item (SoupSession          *session,
-                          SoupMessageQueueItem *item)
+soup_session_process_queue_item (SoupSession          *session,
+                                SoupMessageQueueItem *item,
+                                gboolean             *should_cleanup,
+                                gboolean              loop)
 {
-       SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
-       SoupSessionHost *host;
+       g_assert (item->session == session);
 
-       if (item->conn) {
-               if (item->msg->method != SOUP_METHOD_CONNECT ||
-                   !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code))
-                       soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
-               soup_session_set_item_connection (session, item, NULL);
-       }
+       do {
+               if (item->paused)
+                       return;
 
-       if (item->state != SOUP_MESSAGE_FINISHED) {
-               g_warning ("finished an item with state %d", item->state);
-               return;
-       }
+               switch (item->state) {
+               case SOUP_MESSAGE_STARTING:
+                       if (!get_connection (item, should_cleanup))
+                               return;
+                       break;
 
-       soup_message_queue_remove (priv->queue, item);
+               case SOUP_MESSAGE_CONNECTED:
+                       if (soup_connection_is_tunnelled (item->conn))
+                               tunnel_connect (item);
+                       else
+                               item->state = SOUP_MESSAGE_READY;
+                       break;
 
-       g_mutex_lock (&priv->conn_lock);
-       host = get_host_for_message (session, item->msg);
-       host->num_messages--;
-       g_mutex_unlock (&priv->conn_lock);
+               case SOUP_MESSAGE_READY:
+                       soup_message_set_https_status (item->msg, item->conn);
+                       if (item->msg->status_code) {
+                               if (item->msg->status_code == SOUP_STATUS_TRY_AGAIN) {
+                                       soup_message_cleanup_response (item->msg);
+                                       item->state = SOUP_MESSAGE_STARTING;
+                               } else
+                                       item->state = SOUP_MESSAGE_FINISHING;
+                               break;
+                       }
 
-       /* g_signal_handlers_disconnect_by_func doesn't work if you
-        * have a metamarshal, meaning it doesn't work with
-        * soup_message_add_header_handler()
-        */
-       g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA,
-                                             0, 0, NULL, NULL, item);
-       g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, item->msg);
-       soup_message_queue_item_unref (item);
+                       item->state = SOUP_MESSAGE_RUNNING;
+
+                       soup_session_send_queue_item (session, item, message_completed);
+
+                       if (item->new_api) {
+                               if (item->async)
+                                       async_send_request_running (session, item);
+                               return;
+                       }
+                       break;
+
+               case SOUP_MESSAGE_RUNNING:
+                       if (item->async)
+                               return;
+
+                       g_warn_if_fail (item->new_api);
+                       item->state = SOUP_MESSAGE_FINISHING;
+                       break;
+
+               case SOUP_MESSAGE_RESTARTING:
+                       item->state = SOUP_MESSAGE_STARTING;
+                       soup_message_restarted (item->msg);
+                       break;
+
+               case SOUP_MESSAGE_FINISHING:
+                       item->state = SOUP_MESSAGE_FINISHED;
+                       soup_message_finished (item->msg);
+                       if (item->state != SOUP_MESSAGE_FINISHED) {
+                               g_return_if_fail (!item->new_api);
+                               break;
+                       }
+
+                       soup_session_unqueue_item (session, item);
+                       if (item->async && item->callback)
+                               item->callback (session, item->msg, item->callback_data);
+                       return;
+
+               default:
+                       /* Nothing to do with this message in any
+                        * other state.
+                        */
+                       g_warn_if_fail (item->async);
+                       return;
+               }
+       } while (loop && item->state != SOUP_MESSAGE_FINISHED);
 }
 
 static void
-proxy_connection_event (SoupConnection      *conn,
-                       GSocketClientEvent   event,
-                       GIOStream           *connection,
-                       gpointer             user_data)
+async_run_queue (SoupSession *session)
 {
-       SoupMessageQueueItem *item = user_data;
+       SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       SoupMessageQueueItem *item;
+       SoupMessage *msg;
+       gboolean try_cleanup = TRUE, should_cleanup = FALSE;
 
-       soup_message_network_event (item->msg, event, connection);
-}
+       g_object_ref (session);
+       soup_session_cleanup_connections (session, FALSE);
 
-void
-soup_session_set_item_connection (SoupSession          *session,
-                                 SoupMessageQueueItem *item,
-                                 SoupConnection       *conn)
-{
-       if (item->conn) {
-               g_signal_handlers_disconnect_by_func (item->conn, proxy_connection_event, item);
-               g_object_unref (item->conn);
-       }
+ try_again:
+       for (item = soup_message_queue_first (priv->queue);
+            item;
+            item = soup_message_queue_next (priv->queue, item)) {
+               msg = item->msg;
 
-       item->conn = conn;
+               /* CONNECT messages are handled specially */
+               if (msg->method == SOUP_METHOD_CONNECT)
+                       continue;
 
-       if (item->conn) {
-               g_object_ref (item->conn);
-               g_signal_connect (item->conn, "event",
-                                 G_CALLBACK (proxy_connection_event), item);
+               if (item->async_context != soup_session_get_async_context (session))
+                       continue;
+
+               soup_session_process_queue_item (session, item, &should_cleanup, TRUE);
+       }
+
+       if (try_cleanup && should_cleanup) {
+               /* There is at least one message in the queue that
+                * could be sent if we cleanupd an idle connection from
+                * some other server.
+                */
+               if (soup_session_cleanup_connections (session, TRUE)) {
+                       try_cleanup = should_cleanup = FALSE;
+                       goto try_again;
+               }
        }
+
+       g_object_unref (session);
 }
 
-void
-soup_session_set_item_status (SoupSession          *session,
-                             SoupMessageQueueItem *item,
-                             guint                 status_code)
+static gboolean
+idle_run_queue (gpointer user_data)
 {
-       SoupURI *uri;
-       char *msg;
+       SoupSessionPrivate *priv = user_data;
+       GSource *source;
 
-       switch (status_code) {
-       case SOUP_STATUS_CANT_RESOLVE:
-       case SOUP_STATUS_CANT_CONNECT:
-               uri = soup_message_get_uri (item->msg);
-               msg = g_strdup_printf ("%s (%s)",
-                                      soup_status_get_phrase (status_code),
-                                      uri->host);
-               soup_message_set_status_full (item->msg, status_code, msg);
-               g_free (msg);
-               break;
+       if (priv->disposed)
+               return FALSE;
 
-       case SOUP_STATUS_CANT_RESOLVE_PROXY:
-       case SOUP_STATUS_CANT_CONNECT_PROXY:
-               if (item->proxy_uri && item->proxy_uri->host) {
-                       msg = g_strdup_printf ("%s (%s)",
-                                              soup_status_get_phrase (status_code),
-                                              item->proxy_uri->host);
-                       soup_message_set_status_full (item->msg, status_code, msg);
-                       g_free (msg);
-                       break;
-               }
-               soup_message_set_status (item->msg, status_code);
-               break;
+       source = g_main_current_source ();
+       priv->run_queue_sources = g_slist_remove (priv->run_queue_sources, source);
 
-       case SOUP_STATUS_SSL_FAILED:
-               if (!g_tls_backend_supports_tls (g_tls_backend_get_default ())) {
-                       soup_message_set_status_full (item->msg, status_code,
-                                                     "TLS/SSL support not available; install glib-networking");
-               } else
-                       soup_message_set_status (item->msg, status_code);
-               break;
+       /* Ensure that the source is destroyed before running the queue */
+       g_source_destroy (source);
+       g_source_unref (source);
 
-       default:
-               soup_message_set_status (item->msg, status_code);
-               break;
-       }
+       g_assert (priv->session);
+       async_run_queue (priv->session);
+       return FALSE;
 }
 
 /**
@@ -1517,7 +1857,7 @@ void
 soup_session_queue_message (SoupSession *session, SoupMessage *msg,
                            SoupSessionCallback callback, gpointer user_data)
 {
-       g_return_if_fail (SOUP_IS_SESSION (session));
+       g_return_if_fail (SOUP_IS_SESSION_ASYNC (session) || SOUP_IS_SESSION_SYNC (session));
        g_return_if_fail (SOUP_IS_MESSAGE (msg));
 
        SOUP_SESSION_GET_CLASS (session)->queue_message (session, msg,
@@ -1557,7 +1897,6 @@ soup_session_requeue_message (SoupSession *session, SoupMessage *msg)
        SOUP_SESSION_GET_CLASS (session)->requeue_message (session, msg);
 }
 
-
 /**
  * soup_session_send_message:
  * @session: a #SoupSession
@@ -1574,7 +1913,7 @@ soup_session_requeue_message (SoupSession *session, SoupMessage *msg)
 guint
 soup_session_send_message (SoupSession *session, SoupMessage *msg)
 {
-       g_return_val_if_fail (SOUP_IS_SESSION (session), SOUP_STATUS_MALFORMED);
+       g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session) || SOUP_IS_SESSION_SYNC (session), SOUP_STATUS_MALFORMED);
        g_return_val_if_fail (SOUP_IS_MESSAGE (msg), SOUP_STATUS_MALFORMED);
 
        return SOUP_SESSION_GET_CLASS (session)->send_message (session, msg);
@@ -1609,6 +1948,48 @@ soup_session_pause_message (SoupSession *session,
        soup_message_queue_item_unref (item);
 }
 
+static void
+soup_session_real_kick_queue (SoupSession *session)
+{
+       SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       SoupMessageQueueItem *item;
+       gboolean have_sync_items = FALSE;
+
+       if (priv->disposed)
+               return;
+
+       for (item = soup_message_queue_first (priv->queue);
+            item;
+            item = soup_message_queue_next (priv->queue, item)) {
+               if (item->async) {
+                       GSource *source;
+
+                       /* We use priv rather than session as the
+                        * source data, because other parts of libsoup
+                        * (or the calling app) may have sources using
+                        * the session as the source data.
+                        */
+                       source = g_main_context_find_source_by_user_data (item->async_context, priv);
+                       if (!source) {
+                               source = soup_add_completion_reffed (item->async_context,
+                                                                    idle_run_queue, priv);
+                               priv->run_queue_sources = g_slist_prepend (priv->run_queue_sources,
+                                                                          source);
+                       }
+               } else
+                       have_sync_items = TRUE;
+       }
+
+       if (have_sync_items)
+               g_cond_broadcast (&priv->conn_cond);
+}
+
+void
+soup_session_kick_queue (SoupSession *session)
+{
+       SOUP_SESSION_GET_CLASS (session)->kick (session);
+}
+
 /**
  * soup_session_unpause_message:
  * @session: a #SoupSession
@@ -1640,7 +2021,7 @@ soup_session_unpause_message (SoupSession *session,
                soup_message_io_unpause (msg);
        soup_message_queue_item_unref (item);
 
-       SOUP_SESSION_GET_CLASS (session)->kick (session);
+       soup_session_kick_queue (session);
 }
 
 
@@ -1657,6 +2038,7 @@ soup_session_real_cancel_message (SoupSession *session, SoupMessage *msg, guint
        soup_message_set_status (msg, status_code);
        g_cancellable_cancel (item->cancellable);
 
+       soup_session_kick_queue (item->session);
        soup_message_queue_item_unref (item);
 }
 
@@ -1716,13 +2098,52 @@ soup_session_real_flush_queue (SoupSession *session)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
        SoupMessageQueueItem *item;
+       GHashTable *current = NULL;
+       gboolean done = FALSE;
+
+       if (SOUP_IS_SESSION_SYNC (session)) {
+               /* Record the current contents of the queue */
+               current = g_hash_table_new (NULL, NULL);
+               for (item = soup_message_queue_first (priv->queue);
+                    item;
+                    item = soup_message_queue_next (priv->queue, item))
+                       g_hash_table_insert (current, item, item);
+       }
 
+       /* Cancel everything */
        for (item = soup_message_queue_first (priv->queue);
             item;
             item = soup_message_queue_next (priv->queue, item)) {
                soup_session_cancel_message (session, item->msg,
                                             SOUP_STATUS_CANCELLED);
        }
+
+       if (SOUP_IS_SESSION_SYNC (session)) {
+               /* Wait until all of the items in @current have been
+                * removed from the queue. (This is not the same as
+                * "wait for the queue to be empty", because the app
+                * may queue new requests in response to the
+                * cancellation of the old ones. We don't try to
+                * cancel those requests as well, since we'd likely
+                * just end up looping forever.)
+                */
+               g_mutex_lock (&priv->conn_lock);
+               do {
+                       done = TRUE;
+                       for (item = soup_message_queue_first (priv->queue);
+                            item;
+                            item = soup_message_queue_next (priv->queue, item)) {
+                               if (g_hash_table_lookup (current, item))
+                                       done = FALSE;
+                       }
+
+                       if (!done)
+                               g_cond_wait (&priv->conn_cond, &priv->conn_lock);
+               } while (!done);
+               g_mutex_unlock (&priv->conn_lock);
+
+               g_hash_table_destroy (current);
+       }
 }
 
 /**
@@ -2107,6 +2528,7 @@ soup_session_class_init (SoupSessionClass *session_class)
        session_class->cancel_message = soup_session_real_cancel_message;
        session_class->auth_required = soup_session_real_auth_required;
        session_class->flush_queue = soup_session_real_flush_queue;
+       session_class->kick = soup_session_real_kick_queue;
 
        /* virtual method override */
        object_class->dispose = soup_session_dispose;
@@ -2808,3 +3230,384 @@ soup_session_class_init (SoupSessionClass *session_class)
                                    G_TYPE_STRV,
                                    G_PARAM_READWRITE));
 }
+
+
+/* send_request_async */
+
+static void
+async_send_request_return_result (SoupMessageQueueItem *item,
+                                 gpointer stream, GError *error)
+{
+       GTask *task;
+
+       g_return_if_fail (item->task != NULL);
+
+       g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA,
+                                             0, 0, NULL, NULL, item);
+
+       task = item->task;
+       item->task = NULL;
+
+       if (item->io_source) {
+               g_source_destroy (item->io_source);
+               g_clear_pointer (&item->io_source, g_source_unref);
+       }
+
+       if (error)
+               g_task_return_error (task, error);
+       else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+               if (stream)
+                       g_object_unref (stream);
+               g_task_return_new_error (task, SOUP_HTTP_ERROR,
+                                        item->msg->status_code,
+                                        "%s",
+                                        item->msg->reason_phrase);
+       } else
+               g_task_return_pointer (task, stream, g_object_unref);
+       g_object_unref (task);
+}
+
+static void
+async_send_request_restarted (SoupMessage *msg, gpointer user_data)
+{
+       SoupMessageQueueItem *item = user_data;
+
+       /* We won't be needing this, then. */
+       g_object_set_data (G_OBJECT (item->msg), "SoupSession:ostream", NULL);
+       item->io_started = FALSE;
+}
+
+static void
+async_send_request_finished (SoupMessage *msg, gpointer user_data)
+{
+       SoupMessageQueueItem *item = user_data;
+       GMemoryOutputStream *mostream;
+       GInputStream *istream = NULL;
+       GError *error = NULL;
+
+       if (!item->task) {
+               /* Something else already took care of it. */
+               return;
+       }
+
+       mostream = g_object_get_data (G_OBJECT (item->task), "SoupSession:ostream");
+       if (mostream) {
+               gpointer data;
+               gssize size;
+
+               /* We thought it would be requeued, but it wasn't, so
+                * return the original body.
+                */
+               size = g_memory_output_stream_get_data_size (mostream);
+               data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
+               istream = g_memory_input_stream_new_from_data (data, size, g_free);
+       } else if (item->io_started) {
+               /* The message finished before becoming readable. This
+                * will happen, eg, if it's cancelled from got-headers.
+                * Do nothing; the op will complete via read_ready_cb()
+                * after we return;
+                */
+               return;
+       } else {
+               /* The message finished before even being started;
+                * probably a tunnel connect failure.
+                */
+               istream = g_memory_input_stream_new ();
+       }
+
+       async_send_request_return_result (item, istream, error);
+}
+
+static void
+send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+       SoupMessageQueueItem *item = user_data;
+       GInputStream *istream = g_object_get_data (source, "istream");
+       GError *error = NULL;
+
+       /* It should be safe to call the sync close() method here since
+        * the message body has already been written.
+        */
+       g_input_stream_close (istream, NULL, NULL);
+       g_object_unref (istream);
+
+       /* If the message was cancelled, it will be completed via other means */
+       if (g_cancellable_is_cancelled (item->cancellable) ||
+           !item->task) {
+               soup_message_queue_item_unref (item);
+               return;
+       }
+
+       if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
+                                          result, &error) == -1) {
+               async_send_request_return_result (item, NULL, error);
+               soup_message_queue_item_unref (item);
+               return;
+       }
+
+       /* Otherwise either restarted or finished will eventually be called. */
+       soup_session_kick_queue (item->session);
+       soup_message_queue_item_unref (item);
+}
+
+static void
+send_async_maybe_complete (SoupMessageQueueItem *item,
+                          GInputStream         *stream)
+{
+       if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
+           item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
+           soup_session_would_redirect (item->session, item->msg)) {
+               GOutputStream *ostream;
+
+               /* Message may be requeued, so gather the current message body... */
+               ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+               g_object_set_data_full (G_OBJECT (item->task), "SoupSession:ostream",
+                                       ostream, g_object_unref);
+
+               g_object_set_data (G_OBJECT (ostream), "istream", stream);
+
+               /* Give the splice op its own ref on item */
+               soup_message_queue_item_ref (item);
+               g_output_stream_splice_async (ostream, stream,
+                                             /* We can't use CLOSE_SOURCE because it
+                                              * might get closed in the wrong thread.
+                                              */
+                                             G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+                                             G_PRIORITY_DEFAULT,
+                                             item->cancellable,
+                                             send_async_spliced, item);
+               return;
+       }
+
+       async_send_request_return_result (item, stream, NULL);
+}
+
+static void try_run_until_read (SoupMessageQueueItem *item);
+
+static gboolean
+read_ready_cb (SoupMessage *msg, gpointer user_data)
+{
+       SoupMessageQueueItem *item = user_data;
+
+       g_clear_pointer (&item->io_source, g_source_unref);
+       try_run_until_read (item);
+       return FALSE;
+}
+
+static void
+try_run_until_read (SoupMessageQueueItem *item)
+{
+       GError *error = NULL;
+       GInputStream *stream = NULL;
+
+       if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+               stream = soup_message_io_get_response_istream (item->msg, &error);
+       if (stream) {
+               send_async_maybe_complete (item, stream);
+               return;
+       }
+
+       if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
+               item->state = SOUP_MESSAGE_RESTARTING;
+               soup_message_io_finished (item->msg);
+               g_error_free (error);
+               return;
+       }
+
+       if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+               if (item->state != SOUP_MESSAGE_FINISHED) {
+                       if (soup_message_io_in_progress (item->msg))
+                               soup_message_io_finished (item->msg);
+                       item->state = SOUP_MESSAGE_FINISHING;
+                       soup_session_process_queue_item (item->session, item, NULL, FALSE);
+               }
+               async_send_request_return_result (item, NULL, error);
+               return;
+       }
+
+       g_clear_error (&error);
+       item->io_source = soup_message_io_get_source (item->msg, item->cancellable,
+                                                     read_ready_cb, item);
+       g_source_attach (item->io_source, soup_session_get_async_context (item->session));
+}
+
+static void
+async_send_request_running (SoupSession *session, SoupMessageQueueItem *item)
+{
+       item->io_started = TRUE;
+       try_run_until_read (item);
+}
+
+void
+soup_session_send_request_async (SoupSession         *session,
+                                SoupMessage         *msg,
+                                GCancellable        *cancellable,
+                                GAsyncReadyCallback  callback,
+                                gpointer             user_data)
+{
+       SoupMessageQueueItem *item;
+       gboolean use_thread_context;
+
+       g_return_if_fail (SOUP_IS_SESSION (session));
+       g_return_if_fail (!SOUP_IS_SESSION_SYNC (session));
+
+       g_object_get (G_OBJECT (session),
+                     SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
+                     NULL);
+       g_return_if_fail (use_thread_context);
+
+       item = soup_session_append_queue_item (session, msg, TRUE, TRUE,
+                                              NULL, NULL);
+       g_signal_connect (msg, "restarted",
+                         G_CALLBACK (async_send_request_restarted), item);
+       g_signal_connect (msg, "finished",
+                         G_CALLBACK (async_send_request_finished), item);
+
+       item->new_api = TRUE;
+       item->task = g_task_new (session, cancellable, callback, user_data);
+       g_task_set_task_data (item->task, item, (GDestroyNotify) soup_message_queue_item_unref);
+
+       if (cancellable) {
+               g_object_unref (item->cancellable);
+               item->cancellable = g_object_ref (cancellable);
+       }
+
+       soup_session_kick_queue (session);
+}
+
+GInputStream *
+soup_session_send_request_finish (SoupSession   *session,
+                                 GAsyncResult  *result,
+                                 GError       **error)
+{
+       GTask *task;
+
+       g_return_val_if_fail (SOUP_IS_SESSION (session), NULL);
+       g_return_val_if_fail (!SOUP_IS_SESSION_SYNC (session), NULL);
+       g_return_val_if_fail (g_task_is_valid (result, session), NULL);
+
+       task = G_TASK (result);
+       if (g_task_had_error (task)) {
+               SoupMessageQueueItem *item = g_task_get_task_data (task);
+
+               if (soup_message_io_in_progress (item->msg))
+                       soup_message_io_finished (item->msg);
+               else if (item->state != SOUP_MESSAGE_FINISHED)
+                       item->state = SOUP_MESSAGE_FINISHING;
+
+               if (item->state != SOUP_MESSAGE_FINISHED)
+                       soup_session_process_queue_item (session, item, NULL, FALSE);
+       }
+
+       return g_task_propagate_pointer (task, error);
+}
+
+GInputStream *
+soup_session_send_request (SoupSession   *session,
+                          SoupMessage   *msg,
+                          GCancellable  *cancellable,
+                          GError       **error)
+{
+       SoupMessageQueueItem *item;
+       GInputStream *stream = NULL;
+       GOutputStream *ostream;
+       GMemoryOutputStream *mostream;
+       gssize size;
+       GError *my_error = NULL;
+
+       g_return_val_if_fail (SOUP_IS_SESSION (session), NULL);
+       g_return_val_if_fail (!SOUP_IS_SESSION_ASYNC (session), NULL);
+
+       item = soup_session_append_queue_item (session, msg, FALSE, TRUE,
+                                              NULL, NULL);
+
+       item->new_api = TRUE;
+       if (cancellable) {
+               g_object_unref (item->cancellable);
+               item->cancellable = g_object_ref (cancellable);
+       }
+
+       while (!stream) {
+               /* Get a connection, etc */
+               soup_session_process_queue_item (session, item, NULL, TRUE);
+               if (item->state != SOUP_MESSAGE_RUNNING)
+                       break;
+
+               /* Send request, read headers */
+               if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) {
+                       if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
+                               item->state = SOUP_MESSAGE_RESTARTING;
+                               soup_message_io_finished (item->msg);
+                               g_clear_error (&my_error);
+                               continue;
+                       } else
+                               break;
+               }
+
+               stream = soup_message_io_get_response_istream (msg, &my_error);
+               if (!stream)
+                       break;
+
+               /* Break if the message doesn't look likely-to-be-requeued */
+               if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
+                   msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
+                   !soup_session_would_redirect (session, msg))
+                       break;
+
+               /* Gather the current message body... */
+               ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+               if (g_output_stream_splice (ostream, stream,
+                                           G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+                                           G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+                                           item->cancellable, &my_error) == -1) {
+                       g_object_unref (stream);
+                       g_object_unref (ostream);
+                       stream = NULL;
+                       break;
+               }
+               g_object_unref (stream);
+               stream = NULL;
+
+               /* If the message was requeued, loop */
+               if (item->state == SOUP_MESSAGE_RESTARTING) {
+                       g_object_unref (ostream);
+                       continue;
+               }
+
+               /* Not requeued, so return the original body */
+               mostream = G_MEMORY_OUTPUT_STREAM (ostream);
+               size = g_memory_output_stream_get_data_size (mostream);
+               stream = g_memory_input_stream_new ();
+               if (size) {
+                       g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
+                                                       g_memory_output_stream_steal_data (mostream),
+                                                       size, g_free);
+               }
+               g_object_unref (ostream);
+       }
+
+       if (my_error)
+               g_propagate_error (error, my_error);
+       else if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+               if (stream) {
+                       g_object_unref (stream);
+                       stream = NULL;
+               }
+               g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
+                                    msg->reason_phrase);
+       } else if (!stream)
+               stream = g_memory_input_stream_new ();
+
+       if (!stream) {
+               if (soup_message_io_in_progress (msg))
+                       soup_message_io_finished (msg);
+               else if (item->state != SOUP_MESSAGE_FINISHED)
+                       item->state = SOUP_MESSAGE_FINISHING;
+
+               if (item->state != SOUP_MESSAGE_FINISHED)
+                       soup_session_process_queue_item (session, item, NULL, TRUE);
+       }
+
+       soup_message_queue_item_unref (item);
+       return stream;
+}
index bc45e9f..a437937 100644 (file)
@@ -372,21 +372,22 @@ do_test_for_thread_and_context (SoupSession *session, const char *base_uri)
 }
 
 static void
-do_simple_test (const char *uri)
+do_simple_test (const char *uri, gboolean plain_session)
 {
        SoupSession *session;
 
-       debug_printf (1, "Simple streaming test\n");
+       debug_printf (1, "Simple streaming test with %s\n",
+                     plain_session ? "SoupSession" : "SoupSessionAsync");
 
-       session = soup_test_session_new (SOUP_TYPE_SESSION_ASYNC,
+       session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_ASYNC,
                                         SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
                                         NULL);
        do_test_for_thread_and_context (session, uri);
        soup_test_session_abort_unref (session);
 }
 
-static gpointer
-do_test_with_context (const char *uri)
+static void
+do_test_with_context_and_type (const char *uri, gboolean plain_session)
 {
        GMainContext *async_context;
        SoupSession *session;
@@ -394,7 +395,7 @@ do_test_with_context (const char *uri)
        async_context = g_main_context_new ();
        g_main_context_push_thread_default (async_context);
 
-       session = soup_test_session_new (SOUP_TYPE_SESSION_ASYNC,
+       session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_ASYNC,
                                         SOUP_SESSION_ASYNC_CONTEXT, async_context,
                                         SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
                                         NULL);
@@ -404,25 +405,43 @@ do_test_with_context (const char *uri)
 
        g_main_context_pop_thread_default (async_context);
        g_main_context_unref (async_context);
+}
+
+static gpointer
+do_test_with_context (gpointer uri)
+{
+       do_test_with_context_and_type (uri, FALSE);
+       return NULL;
+}
+
+static gpointer
+do_plain_test_with_context (gpointer uri)
+{
+       do_test_with_context_and_type (uri, TRUE);
        return NULL;
 }
 
 static void
-do_context_test (const char *uri)
+do_context_test (const char *uri, gboolean plain_session)
 {
-       debug_printf (1, "Streaming with a non-default-context\n");
-       do_test_with_context (uri);
+       debug_printf (1, "Streaming with a non-default-context with %s\n",
+                     plain_session ? "SoupSession" : "SoupSessionAsync");
+       if (plain_session)
+               do_plain_test_with_context ((gpointer)uri);
+       else
+               do_test_with_context ((gpointer)uri);
 }
 
 static void
-do_thread_test (const char *uri)
+do_thread_test (const char *uri, gboolean plain_session)
 {
        GThread *thread;
 
-       debug_printf (1, "Streaming in another thread\n");
+       debug_printf (1, "Streaming in another thread with %s\n",
+                     plain_session ? "SoupSession" : "SoupSessionAsync");
 
        thread = g_thread_new ("do_test_with_context",
-                              (GThreadFunc)do_test_with_context,
+                              plain_session ? do_plain_test_with_context : do_test_with_context,
                               (gpointer)uri);
        g_thread_join (thread);
 }
@@ -542,16 +561,17 @@ do_sync_request (SoupSession *session, SoupRequest *request,
 }
 
 static void
-do_sync_test (const char *uri_string)
+do_sync_test (const char *uri_string, gboolean plain_session)
 {
        SoupSession *session;
        SoupRequester *requester;
        SoupRequest *request;
        SoupURI *uri;
 
-       debug_printf (1, "Sync streaming\n");
+       debug_printf (1, "Sync streaming with %s\n",
+                     plain_session ? "SoupSession" : "SoupSessionSync");
 
-       session = soup_test_session_new (SOUP_TYPE_SESSION_SYNC, NULL);
+       session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_SYNC, NULL);
        requester = soup_requester_new ();
        soup_session_add_feature (session, SOUP_SESSION_FEATURE (requester));
        g_object_unref (requester);
@@ -614,10 +634,15 @@ main (int argc, char **argv)
 
        uri = g_strdup_printf ("http://127.0.0.1:%u/foo", soup_server_get_port (server));
 
-       do_simple_test (uri);
-       do_thread_test (uri);
-       do_context_test (uri);
-       do_sync_test (uri);
+       do_simple_test (uri, FALSE);
+       do_thread_test (uri, FALSE);
+       do_context_test (uri, FALSE);
+       do_sync_test (uri, FALSE);
+
+       do_simple_test (uri, TRUE);
+       do_thread_test (uri, TRUE);
+       do_context_test (uri, TRUE);
+       do_sync_test (uri, TRUE);
 
        g_free (uri);
        soup_buffer_free (response);