#include "soup-message-queue.h"
#include "soup-misc.h"
#include "soup-password-manager.h"
-#include "soup-proxy-uri-resolver.h"
#include "soup-uri.h"
/**
static void run_queue (SoupSessionAsync *sa);
static void do_idle_run_queue (SoupSession *session);
+static void send_request_running (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_finished (SoupSession *session, SoupMessageQueueItem *item);
+
static void queue_message (SoupSession *session, SoupMessage *req,
SoupSessionCallback callback, gpointer user_data);
static guint send_message (SoupSession *session, SoupMessage *req);
G_DEFINE_TYPE (SoupSessionAsync, soup_session_async, SOUP_TYPE_SESSION)
typedef struct {
- GHashTable *idle_run_queue_sources;
+ SoupSessionAsync *sa;
+ gboolean disposed;
} SoupSessionAsyncPrivate;
#define SOUP_SESSION_ASYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_ASYNC, SoupSessionAsyncPrivate))
static void
-destroy_unref_source (gpointer source)
-{
- g_source_destroy (source);
- g_source_unref (source);
-}
-
-static void
soup_session_async_init (SoupSessionAsync *sa)
{
SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa);
- priv->idle_run_queue_sources =
- g_hash_table_new_full (NULL, NULL, NULL, destroy_unref_source);
+ priv->sa = sa;
}
static void
{
SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (object);
- if (priv->idle_run_queue_sources) {
- g_hash_table_destroy (priv->idle_run_queue_sources);
- priv->idle_run_queue_sources = NULL;
- }
+ priv->disposed = TRUE;
G_OBJECT_CLASS (soup_session_async_parent_class)->dispose (object);
}
return session;
}
-static gboolean
-item_failed (SoupMessageQueueItem *item, guint status)
-{
- if (item->removed) {
- soup_message_queue_item_unref (item);
- return TRUE;
- }
-
- if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
- item->state = SOUP_MESSAGE_FINISHING;
- if (!item->msg->status_code)
- soup_session_set_item_status (item->session, item, status);
- do_idle_run_queue (item->session);
- soup_message_queue_item_unref (item);
- return TRUE;
- }
-
- return FALSE;
-}
-
-static void
-resolved_proxy_addr (SoupAddress *addr, guint status, gpointer user_data)
-{
- SoupMessageQueueItem *item = user_data;
- SoupSession *session = item->session;
-
- if (item_failed (item, soup_status_proxify (status)))
- return;
-
- item->proxy_addr = g_object_ref (addr);
- item->state = SOUP_MESSAGE_AWAITING_CONNECTION;
-
- soup_message_queue_item_unref (item);
-
- /* If we got here we know session still exists */
- run_queue ((SoupSessionAsync *)session);
-}
-
-static void
-resolved_proxy_uri (SoupProxyURIResolver *proxy_resolver,
- guint status, SoupURI *proxy_uri, gpointer user_data)
-{
- SoupMessageQueueItem *item = user_data;
- SoupSession *session = item->session;
-
- if (item_failed (item, status))
- return;
-
- if (proxy_uri) {
- SoupAddress *proxy_addr;
-
- item->state = SOUP_MESSAGE_RESOLVING_PROXY_ADDRESS;
-
- item->proxy_uri = soup_uri_copy (proxy_uri);
- proxy_addr = soup_address_new (proxy_uri->host,
- proxy_uri->port);
- soup_address_resolve_async (proxy_addr,
- soup_session_get_async_context (session),
- item->cancellable,
- resolved_proxy_addr, item);
- g_object_unref (proxy_addr);
- return;
- }
-
- item->state = SOUP_MESSAGE_AWAITING_CONNECTION;
- soup_message_queue_item_unref (item);
-
- /* If we got here we know session still exists */
- run_queue ((SoupSessionAsync *)session);
-}
-
-static void
-resolve_proxy_addr (SoupMessageQueueItem *item,
- SoupProxyURIResolver *proxy_resolver)
-{
- item->state = SOUP_MESSAGE_RESOLVING_PROXY_URI;
-
- soup_message_queue_item_ref (item);
- soup_proxy_uri_resolver_get_proxy_uri_async (
- proxy_resolver, soup_message_get_uri (item->msg),
- soup_session_get_async_context (item->session),
- item->cancellable, resolved_proxy_uri, item);
-}
-
static void
connection_closed (SoupConnection *conn, gpointer session)
{
{
SoupMessageQueueItem *item = user_data;
+ do_idle_run_queue (item->session);
+
if (item->state != SOUP_MESSAGE_RESTARTING)
item->state = SOUP_MESSAGE_FINISHING;
- do_idle_run_queue (item->session);
}
static void
{
SoupMessageQueueItem *item = user_data;
SoupSession *session = item->session;
- SoupAddress *tunnel_addr;
if (item->state != SOUP_MESSAGE_CONNECTING) {
soup_connection_disconnect (conn);
return;
}
- tunnel_addr = soup_connection_get_tunnel_addr (conn);
- if (tunnel_addr) {
+ if (soup_connection_is_tunnelled (conn)) {
SoupMessageQueueItem *tunnel_item;
item->state = SOUP_MESSAGE_TUNNELING;
gboolean loop)
{
SoupSession *session = item->session;
- SoupProxyURIResolver *proxy_resolver;
if (item->async_context != soup_session_get_async_context (session))
return;
switch (item->state) {
case SOUP_MESSAGE_STARTING:
- proxy_resolver = (SoupProxyURIResolver *)soup_session_get_feature_for_message (session, SOUP_TYPE_PROXY_URI_RESOLVER, item->msg);
- if (!proxy_resolver) {
- item->state = SOUP_MESSAGE_AWAITING_CONNECTION;
- break;
- }
- resolve_proxy_addr (item, proxy_resolver);
- return;
+ item->state = SOUP_MESSAGE_AWAITING_CONNECTION;
+ break;
case SOUP_MESSAGE_AWAITING_CONNECTION:
if (!soup_session_get_connection (session, item, should_prune))
case SOUP_MESSAGE_READY:
item->state = SOUP_MESSAGE_RUNNING;
soup_session_send_queue_item (session, item, message_completed);
+ if (item->new_api)
+ send_request_running (session, item);
break;
case SOUP_MESSAGE_RESTARTING:
item->state = SOUP_MESSAGE_STARTING;
soup_message_restarted (item->msg);
+ if (item->new_api)
+ send_request_restarted (session, item);
break;
case SOUP_MESSAGE_FINISHING:
soup_session_unqueue_item (session, item);
if (item->callback)
item->callback (session, item->msg, item->callback_data);
+ else if (item->new_api)
+ send_request_finished (session, item);
g_object_unref (item->msg);
do_idle_run_queue (session);
g_object_unref (session);
}
static gboolean
-idle_run_queue (gpointer sa)
+idle_run_queue (gpointer user_data)
{
- SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa);
+ SoupSessionAsyncPrivate *priv = user_data;
- if (!priv->idle_run_queue_sources)
+ if (priv->disposed)
return FALSE;
- g_hash_table_remove (priv->idle_run_queue_sources,
- soup_session_get_async_context (sa));
- run_queue (sa);
+ /* Ensure that the source is destroyed before running the queue */
+ g_source_destroy (g_main_current_source ());
+
+ run_queue (priv->sa);
return FALSE;
}
do_idle_run_queue (SoupSession *session)
{
SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (session);
+ GMainContext *async_context = soup_session_get_async_context (session);
+ GSource *source;
- if (!priv->idle_run_queue_sources)
+ if (priv->disposed)
return;
- if (!g_hash_table_lookup (priv->idle_run_queue_sources,
- soup_session_get_async_context (session))) {
- GMainContext *async_context = soup_session_get_async_context (session);
- GSource *source = soup_add_completion (async_context, idle_run_queue, session);
+ /* We use priv rather than session as the source data, because
+ * other parts of libsoup (or the calling app) may have sources
+ * using the session as the source data.
+ */
+
+ source = g_main_context_find_source_by_user_data (async_context, priv);
+ if (source)
+ return;
- g_hash_table_insert (priv->idle_run_queue_sources,
- async_context, g_source_ref (source));
- }
+ source = soup_add_completion (async_context, idle_run_queue, priv);
}
static void
{
do_idle_run_queue (session);
}
+
+
+static void
+send_request_return_result (SoupMessageQueueItem *item,
+ gpointer stream, GError *error)
+{
+ GSimpleAsyncResult *simple;
+
+ simple = item->result;
+ item->result = NULL;
+
+ if (error)
+ g_simple_async_result_take_error (simple, error);
+ else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+ if (stream)
+ g_object_unref (stream);
+ g_simple_async_result_set_error (simple,
+ SOUP_HTTP_ERROR,
+ item->msg->status_code,
+ "%s",
+ item->msg->reason_phrase);
+ } else
+ g_simple_async_result_set_op_res_gpointer (simple, stream, g_object_unref);
+
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+}
+
+static void
+send_request_restarted (SoupSession *session, SoupMessageQueueItem *item)
+{
+ /* We won't be needing this, then. */
+ g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL);
+ item->io_started = FALSE;
+}
+
+static void
+send_request_finished (SoupSession *session, SoupMessageQueueItem *item)
+{
+ GMemoryOutputStream *mostream;
+ GInputStream *istream = NULL;
+ GError *error = NULL;
+
+ if (!item->result) {
+ /* Something else already took care of it. */
+ return;
+ }
+
+ mostream = g_object_get_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream");
+ if (mostream) {
+ gpointer data;
+ gssize size;
+
+ /* We thought it would be requeued, but it wasn't, so
+ * return the original body.
+ */
+ size = g_memory_output_stream_get_data_size (mostream);
+ data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
+ istream = g_memory_input_stream_new_from_data (data, size, g_free);
+ } else if (item->io_started) {
+ /* The message finished before becoming readable. This
+ * will happen, eg, if it's cancelled from got-headers.
+ * Do nothing; the op will complete via read_ready_cb()
+ * after we return;
+ */
+ return;
+ } else {
+ /* The message finished before even being started;
+ * probably a tunnel connect failure.
+ */
+ istream = g_memory_input_stream_new ();
+ }
+
+ send_request_return_result (item, istream, error);
+}
+
+static void
+send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ GInputStream *istream = g_object_get_data (source, "istream");
+
+ GError *error = NULL;
+
+ /* If the message was cancelled, it will be completed via other means */
+ if (g_cancellable_is_cancelled (item->cancellable) ||
+ !item->result) {
+ soup_message_queue_item_unref (item);
+ return;
+ }
+
+ if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
+ result, &error) == -1) {
+ send_request_return_result (item, NULL, error);
+ return;
+ }
+
+ /* Otherwise either restarted or finished will eventually be called.
+ * It should be safe to call the sync close() method here since
+ * the message body has already been written.
+ */
+ g_input_stream_close (istream, NULL, NULL);
+ do_idle_run_queue (item->session);
+ soup_message_queue_item_unref (item);
+}
+
+static void
+send_async_maybe_complete (SoupMessageQueueItem *item,
+ GInputStream *stream)
+{
+ if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
+ item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
+ soup_session_would_redirect (item->session, item->msg)) {
+ GOutputStream *ostream;
+
+ /* Message may be requeued, so gather the current message body... */
+ ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ g_object_set_data_full (G_OBJECT (item->msg), "SoupSessionAsync:ostream",
+ ostream, g_object_unref);
+
+ g_object_set_data_full (G_OBJECT (ostream), "istream",
+ stream, g_object_unref);
+
+ /* Give the splice op its own ref on item */
+ soup_message_queue_item_ref (item);
+ g_output_stream_splice_async (ostream, stream,
+ /* We can't use CLOSE_SOURCE because it
+ * might get closed in the wrong thread.
+ */
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ G_PRIORITY_DEFAULT,
+ item->cancellable,
+ send_async_spliced, item);
+ return;
+ }
+
+ send_request_return_result (item, stream, NULL);
+}
+
+static void try_run_until_read (SoupMessageQueueItem *item);
+
+static gboolean
+read_ready_cb (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+
+ try_run_until_read (item);
+ return FALSE;
+}
+
+static void
+try_run_until_read (SoupMessageQueueItem *item)
+{
+ GError *error = NULL;
+ GInputStream *stream = NULL;
+ GSource *source;
+
+ if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+ stream = soup_message_io_get_response_istream (item->msg, &error);
+ if (stream) {
+ send_async_maybe_complete (item, stream);
+ return;
+ }
+
+ if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ send_request_return_result (item, NULL, error);
+ return;
+ }
+
+ g_clear_error (&error);
+ source = soup_message_io_get_source (item->msg, item->cancellable,
+ read_ready_cb, item);
+ g_source_attach (source, soup_session_get_async_context (item->session));
+ g_source_unref (source);
+}
+
+static void
+send_request_running (SoupSession *session, SoupMessageQueueItem *item)
+{
+ item->io_started = TRUE;
+ try_run_until_read (item);
+}
+
+void
+soup_session_send_request_async (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupMessageQueueItem *item;
+ gboolean use_thread_context;
+
+ g_return_if_fail (SOUP_IS_SESSION_ASYNC (session));
+
+ g_object_get (G_OBJECT (session),
+ SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
+ NULL);
+ g_return_if_fail (use_thread_context);
+
+ /* Balance out the unref that queuing will eventually do */
+ g_object_ref (msg);
+
+ queue_message (session, msg, NULL, NULL);
+
+ item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+ g_return_if_fail (item != NULL);
+
+ item->new_api = TRUE;
+ item->result = g_simple_async_result_new (G_OBJECT (session),
+ callback, user_data,
+ soup_session_send_request_async);
+ g_simple_async_result_set_op_res_gpointer (item->result, item, (GDestroyNotify) soup_message_queue_item_unref);
+
+ if (cancellable) {
+ g_object_unref (item->cancellable);
+ item->cancellable = g_object_ref (cancellable);
+ }
+}
+
+GInputStream *
+soup_session_send_request_finish (SoupSession *session,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
+ g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (session), soup_session_send_request_async), NULL);
+
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ if (g_simple_async_result_propagate_error (simple, error))
+ return NULL;
+ return g_object_ref (g_simple_async_result_get_op_res_gpointer (simple));
+}