Make this more complicated, with a SoupMessageQueueItem to keep track of
authorDan Winship <danw@src.gnome.org>
Fri, 3 Oct 2008 20:17:56 +0000 (20:17 +0000)
committerDan Winship <danw@src.gnome.org>
Fri, 3 Oct 2008 20:17:56 +0000 (20:17 +0000)
* libsoup/soup-message-queue.c: Make this more complicated, with a
SoupMessageQueueItem to keep track of the session's per-message
state. (Part of the process of moving session-related state out of
SoupMessagePrivate.)

* libsoup/soup-session.c: Update to work in terms of
SoupMessageQueueItem

* libsoup/soup-session-async.c:
* libsoup/soup-session-sync.c: use SoupMessageQueueItem (and get
rid of SoupSessionAsyncQueueData and SoupSessionSyncAsyncData).

svn path=/trunk/; revision=1178

ChangeLog
libsoup/soup-message-queue.c
libsoup/soup-message-queue.h
libsoup/soup-session-async.c
libsoup/soup-session-sync.c
libsoup/soup-session.c

index b3f30ee..32945ae 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,17 @@
+2008-10-03  Dan Winship  <danw@gnome.org>
+
+       * libsoup/soup-message-queue.c: Make this more complicated, with a
+       SoupMessageQueueItem to keep track of the session's per-message
+       state. (Part of the process of moving session-related state out of
+       SoupMessagePrivate.)
+
+       * libsoup/soup-session.c: Update to work in terms of
+       SoupMessageQueueItem
+
+       * libsoup/soup-session-async.c:
+       * libsoup/soup-session-sync.c: use SoupMessageQueueItem (and get
+       rid of SoupSessionAsyncQueueData and SoupSessionSyncAsyncData).
+
 2008-10-01  Dan Winship  <danw@gnome.org>
 
        * libsoup/soup-multipart.c: New type and methods for working with
index a73749a..b616b77 100644 (file)
@@ -2,7 +2,8 @@
 /*
  * soup-message-queue.c: Message queue
  *
- * Copyright (C) 2003, Ximian, Inc.
+ * Copyright (C) 2003 Novell, Inc.
+ * Copyright (C) 2008 Red Hat, Inc.
  */
 
 #ifdef HAVE_CONFIG_H
 
 #include "soup-message-queue.h"
 
+/**
+ * SECTION:soup-message-queue
+ *
+ * This is an internal structure used by #SoupSession and its
+ * subclasses to keep track of the status of messages currently being
+ * processed.
+ *
+ * The #SoupMessageQueue itself is mostly just a linked list of
+ * #SoupMessageQueueItem, with some added cleverness to allow the list
+ * to be walked safely while other threads / re-entrant loops are
+ * adding items to and removing items from it. In particular, this is
+ * handled by refcounting items and then keeping "removed" items in
+ * the list until their ref_count drops to 0, but skipping over the
+ * "removed" ones when walking the queue.
+ **/
+
 struct SoupMessageQueue {
-       GList *head, *tail;
-       GList *iters;
+       SoupSession *session;
 
        GMutex *mutex;
+       SoupMessageQueueItem *head, *tail;
 };
 
-/**
- * soup_message_queue_new:
- *
- * Creates a new #SoupMessageQueue
- *
- * Return value: a new #SoupMessageQueue object
- **/
 SoupMessageQueue *
-soup_message_queue_new (void)
+soup_message_queue_new (SoupSession *session)
 {
        SoupMessageQueue *queue;
 
        queue = g_slice_new0 (SoupMessageQueue);
+       queue->session = session;
        queue->mutex = g_mutex_new ();
        return queue;
 }
 
-/**
- * soup_message_queue_destroy:
- * @queue: a message queue
- *
- * Frees memory associated with @queue, which must be empty.
- **/
 void
 soup_message_queue_destroy (SoupMessageQueue *queue)
 {
        g_return_if_fail (queue->head == NULL);
 
-       g_list_free (queue->head);
-       g_list_free (queue->iters);
        g_mutex_free (queue->mutex);
        g_slice_free (SoupMessageQueue, queue);
 }
 
 /**
  * soup_message_queue_append:
- * @queue: a queue
- * @msg: a message
+ * @queue: a #SoupMessageQueue
+ * @msg: a #SoupMessage
+ * @callback: the callback for @msg
+ * @user_data: the data to pass to @callback
+ *
+ * Creates a new #SoupMessageQueueItem and appends it to @queue.
  *
- * Appends @msg to the end of @queue
+ * Return value: the new item, which you must unref with
+ * soup_message_queue_unref_item() when you are done with.
  **/
-void
-soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg)
+SoupMessageQueueItem *
+soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
+                          SoupSessionCallback callback, gpointer user_data)
 {
+       SoupMessageQueueItem *item;
+
+       item = g_slice_new0 (SoupMessageQueueItem);
+       item->session = queue->session;
+       item->queue = queue;
+       item->msg = g_object_ref (msg);
+       item->callback = callback;
+       item->callback_data = user_data;
+
+       /* Note: the initial ref_count of 1 represents the caller's
+        * ref; the queue's own ref is indicated by the absence of the
+        * "removed" flag.
+        */
+       item->ref_count = 1;
+
        g_mutex_lock (queue->mutex);
        if (queue->head) {
-               queue->tail = g_list_append (queue->tail, msg);
-               queue->tail = queue->tail->next;
+               queue->tail->next = item;
+               item->prev = queue->tail;
+               queue->tail = item;
        } else
-               queue->head = queue->tail = g_list_append (NULL, msg);
+               queue->head = queue->tail = item;
 
-       g_object_add_weak_pointer (G_OBJECT (msg), &queue->tail->data);
        g_mutex_unlock (queue->mutex);
+       return item;
 }
 
 /**
- * soup_message_queue_first:
- * @queue: a queue
- * @iter: pointer to a #SoupMessageQueueIter
- *
- * Initializes @iter and returns the first element of @queue. If you
- * do not iterate all the way to the end of the list, you must call
- * soup_message_queue_free_iter() to dispose the iterator when you are
- * done.
+ * soup_message_queue_item_ref:
+ * @item: a #SoupMessageQueueItem
  *
- * Return value: the first element of @queue, or %NULL if it is empty.
- **/
-SoupMessage *
-soup_message_queue_first (SoupMessageQueue *queue, SoupMessageQueueIter *iter)
+ * Refs @item.
+ **/ 
+void
+soup_message_queue_item_ref (SoupMessageQueueItem *item)
 {
-       g_mutex_lock (queue->mutex);
+       item->ref_count++;
+}
 
-       if (!queue->head) {
-               g_mutex_unlock (queue->mutex);
-               return NULL;
+/**
+ * soup_message_queue_item_unref:
+ * @item: a #SoupMessageQueueItem
+ *
+ * Unrefs @item; use this on a #SoupMessageQueueItem that you are done
+ * with (but that you aren't passing to
+ * soup_message_queue_item_next()).
+ **/ 
+void
+soup_message_queue_item_unref (SoupMessageQueueItem *item)
+{
+       g_mutex_lock (item->queue->mutex);
+
+       /* Decrement the ref_count; if it's still non-zero OR if the
+        * item is still in the queue, then return.
+        */
+       if (--item->ref_count || !item->removed) {
+               g_mutex_unlock (item->queue->mutex);
+               return;
        }
 
-       queue->iters = g_list_prepend (queue->iters, iter);
-
-       iter->cur = NULL;
-       iter->next = queue->head;
-       g_mutex_unlock (queue->mutex);
-
-       return soup_message_queue_next (queue, iter);
+       /* OK, @item is dead. Rewrite @queue around it */
+       if (item->prev)
+               item->prev->next = item->next;
+       else
+               item->queue->head = item->next;
+       if (item->next)
+               item->next->prev = item->prev;
+       else
+               item->queue->tail = item->prev;
+
+       g_mutex_unlock (item->queue->mutex);
+
+       /* And free it */
+       g_object_unref (item->msg);
+       g_slice_free (SoupMessageQueueItem, item);
 }
 
-static SoupMessage *
-queue_remove_internal (SoupMessageQueue *queue, SoupMessageQueueIter *iter)
+/**
+ * soup_message_queue_lookup:
+ * @queue: a #SoupMessageQueue
+ * @msg: a #SoupMessage
+ *
+ * Finds the #SoupMessageQueueItem for @msg in @queue. You must unref
+ * the item with soup_message_queue_unref_item() when you are done
+ * with it.
+ *
+ * Return value: the queue item for @msg, or %NULL
+ **/ 
+SoupMessageQueueItem *
+soup_message_queue_lookup (SoupMessageQueue *queue, SoupMessage *msg)
 {
-       GList *i;
-       SoupMessageQueueIter *iter2;
-       SoupMessage *msg;
-
-       if (!iter->cur) {
-               /* We're at end of list or this item was already removed */
-               return NULL;
-       }
-
-       /* Fix any other iters pointing to iter->cur */
-       for (i = queue->iters; i; i = i->next) {
-               iter2 = i->data;
-               if (iter2 != iter) {
-                       if (iter2->cur == iter->cur)
-                               iter2->cur = NULL;
-                       else if (iter2->next == iter->cur)
-                               iter2->next = iter->cur->next;
-               }
-       }
+       SoupMessageQueueItem *item;
 
-       msg = iter->cur->data;
-       if (msg)
-               g_object_remove_weak_pointer (G_OBJECT (msg), &iter->cur->data);
+       g_mutex_lock (queue->mutex);
 
-       /* If deleting the last item, fix tail */
-       if (queue->tail == iter->cur)
-               queue->tail = queue->tail->prev;
+       item = queue->tail;
+       while (item && (item->removed || item->msg != msg))
+               item = item->prev;
 
-       /* Remove the item */
-       queue->head = g_list_delete_link (queue->head, iter->cur);
-       iter->cur = NULL;
+       if (item)
+               item->ref_count++;
 
-       return msg;
+       g_mutex_unlock (queue->mutex);
+       return item;
 }
 
 /**
- * soup_message_queue_next:
- * @queue: a queue
- * @iter: pointer to an initialized #SoupMessageQueueIter
+ * soup_message_queue_first:
+ * @queue: a #SoupMessageQueue
  *
- * Returns the next element of @queue
+ * Gets the first item in @queue. You must unref the item by calling
+ * soup_message_queue_unref_item() on it when you are done.
+ * (soup_message_queue_next() does this for you automatically, so you
+ * only need to unref the item yourself if you are not going to
+ * finishing walking the queue.)
  *
- * Return value: the next element, or %NULL if there are no more.
- **/
-SoupMessage *
-soup_message_queue_next (SoupMessageQueue *queue, SoupMessageQueueIter *iter)
+ * Return value: the first item in @queue.
+ **/ 
+SoupMessageQueueItem *
+soup_message_queue_first (SoupMessageQueue *queue)
 {
-       g_mutex_lock (queue->mutex);
+       SoupMessageQueueItem *item;
 
-       while (iter->next) {
-               iter->cur = iter->next;
-               iter->next = iter->cur->next;
-               if (iter->cur->data) {
-                       g_mutex_unlock (queue->mutex);
-                       return iter->cur->data;
-               }
+       g_mutex_lock (queue->mutex);
 
-               /* Message was finalized, remove dead queue element */
-               queue_remove_internal (queue, iter);
-       }
+       item = queue->head;
+       while (item && item->removed)
+               item = item->next;
 
-       /* Nothing left */
-       iter->cur = NULL;
-       queue->iters = g_list_remove (queue->iters, iter);
+       if (item)
+               item->ref_count++;
 
        g_mutex_unlock (queue->mutex);
-       return NULL;
+       return item;
 }
 
 /**
- * soup_message_queue_remove:
- * @queue: a queue
- * @iter: pointer to an initialized #SoupMessageQueueIter
+ * soup_message_queue_next:
+ * @queue: a #SoupMessageQueue
+ * @item: a #SoupMessageQueueItem
  *
- * Removes the queue element pointed to by @iter; that is, the last
- * message returned by soup_message_queue_first() or
- * soup_message_queue_next().
+ * Unrefs @item and gets the next item after it in @queue. As with
+ * soup_message_queue_first(), you must unref the returned item
+ * yourself with soup_message_queue_unref_item() if you do not finish
+ * walking the queue.
  *
- * Return value: the removed message, or %NULL if the element pointed
- * to by @iter was already removed.
- **/
-SoupMessage *
-soup_message_queue_remove (SoupMessageQueue *queue, SoupMessageQueueIter *iter)
+ * Return value: the next item in @queue.
+ **/ 
+SoupMessageQueueItem *
+soup_message_queue_next (SoupMessageQueue *queue, SoupMessageQueueItem *item)
 {
-       SoupMessage *msg;
+       SoupMessageQueueItem *next;
 
        g_mutex_lock (queue->mutex);
-       msg = queue_remove_internal (queue, iter);
-       g_mutex_unlock (queue->mutex);
 
-       return msg;
-}
+       next = item->next;
+       while (next && next->removed)
+               next = next->next;
+       if (next)
+               next->ref_count++;
 
-/**
- * soup_message_queue_remove_message:
- * @queue: a queue
- * @msg: a #SoupMessage
- *
- * Removes the indicated message from @queue.
- **/
-void
-soup_message_queue_remove_message (SoupMessageQueue *queue, SoupMessage *msg)
-{
-       SoupMessageQueueIter iter;
-       SoupMessage *msg2;
-
-       for (msg2 = soup_message_queue_first (queue, &iter); msg2; msg2 = soup_message_queue_next (queue, &iter)) {
-               if (msg2 == msg) {
-                       soup_message_queue_remove (queue, &iter);
-                       soup_message_queue_free_iter (queue, &iter);
-                       return;
-               }
-       }
+       g_mutex_unlock (queue->mutex);
+       soup_message_queue_item_unref (item);
+       return next;
 }
 
-
 /**
- * soup_message_queue_free_iter:
- * @queue: a queue
- * @iter: pointer to an initialized #SoupMessageQueueIter
+ * soup_message_queue_remove:
+ * @queue: a #SoupMessageQueue
+ * @item: a #SoupMessageQueueItem
  *
- * Removes @iter from the list of active iterators in @queue.
- **/
+ * Removes @item from @queue. Note that you probably also need to call
+ * soup_message_queue_unref_item() after this.
+ **/ 
 void
-soup_message_queue_free_iter (SoupMessageQueue *queue,
-                             SoupMessageQueueIter *iter)
+soup_message_queue_remove (SoupMessageQueue *queue, SoupMessageQueueItem *item)
 {
        g_mutex_lock (queue->mutex);
-       queue->iters = g_list_remove (queue->iters, iter);
+       item->removed = TRUE;
        g_mutex_unlock (queue->mutex);
 }
index 24e8ddb..1061581 100644 (file)
@@ -1,6 +1,7 @@
 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
 /*
- * Copyright (C) 2003, Ximian, Inc.
+ * Copyright (C) 2003 Novell, Inc.
+ * Copyright (C) 2008 Red Hat, Inc.
  */
 
 #ifndef SOUP_MESSAGE_QUEUE_H
@@ -8,33 +9,47 @@
 
 #include <glib.h>
 #include <libsoup/soup-message.h>
+#include <libsoup/soup-session.h>
 
 G_BEGIN_DECLS
 
 typedef struct SoupMessageQueue SoupMessageQueue; 
-
-typedef struct {
-       GList *cur, *next;
-} SoupMessageQueueIter;
-
-SoupMessageQueue *soup_message_queue_new        (void);
-void              soup_message_queue_append     (SoupMessageQueue     *queue,
-                                                SoupMessage          *msg);
-
-SoupMessage      *soup_message_queue_first      (SoupMessageQueue     *queue,
-                                                SoupMessageQueueIter *iter);
-SoupMessage      *soup_message_queue_next       (SoupMessageQueue     *queue,
-                                                SoupMessageQueueIter *iter);
-SoupMessage      *soup_message_queue_remove     (SoupMessageQueue     *queue,
-                                                SoupMessageQueueIter *iter);
-
-void              soup_message_queue_free_iter  (SoupMessageQueue     *queue,
-                                                SoupMessageQueueIter *iter);
-
-void              soup_message_queue_destroy    (SoupMessageQueue     *queue);
-
-void              soup_message_queue_remove_message (SoupMessageQueue *queue,
-                                                    SoupMessage      *msg);
+typedef struct SoupMessageQueueItem SoupMessageQueueItem;
+
+struct SoupMessageQueueItem {
+       /*< public >*/
+       SoupSession *session;
+       SoupMessageQueue *queue;
+       SoupMessage *msg;
+       SoupSessionCallback callback;
+       gpointer callback_data;
+
+       /*< private >*/
+       guint removed              : 1;
+       guint ref_count            : 31;
+       SoupMessageQueueItem *prev, *next;
+};
+
+SoupMessageQueue     *soup_message_queue_new        (SoupSession          *session);
+SoupMessageQueueItem *soup_message_queue_append     (SoupMessageQueue     *queue,
+                                                    SoupMessage          *msg,
+                                                    SoupSessionCallback   callback,
+                                                    gpointer              user_data);
+
+SoupMessageQueueItem *soup_message_queue_lookup     (SoupMessageQueue     *queue,
+                                                    SoupMessage          *msg);
+
+SoupMessageQueueItem *soup_message_queue_first      (SoupMessageQueue     *queue);
+SoupMessageQueueItem *soup_message_queue_next       (SoupMessageQueue     *queue,
+                                                    SoupMessageQueueItem *item);
+
+void                  soup_message_queue_remove     (SoupMessageQueue     *queue,
+                                                    SoupMessageQueueItem *item);
+
+void                  soup_message_queue_item_ref   (SoupMessageQueueItem *item);
+void                  soup_message_queue_item_unref (SoupMessageQueueItem *item);
+
+void                  soup_message_queue_destroy    (SoupMessageQueue     *queue);
 
 G_END_DECLS
 
index c8624a3..d3f229b 100644 (file)
@@ -24,7 +24,7 @@
  **/
 
 static gboolean run_queue (SoupSessionAsync *sa);
-static void do_idle_run_queue (SoupSessionAsync *sa);
+static void do_idle_run_queue (SoupSession *session);
 
 static void  queue_message   (SoupSession *session, SoupMessage *req,
                              SoupSessionCallback callback, gpointer user_data);
@@ -108,22 +108,22 @@ soup_session_async_new_with_options (const char *optname1, ...)
 
 
 static void
-connection_closed (SoupConnection *conn, gpointer sa)
+connection_closed (SoupConnection *conn, gpointer session)
 {
        /* Run the queue in case anyone was waiting for a connection
         * to be closed.
         */
-       do_idle_run_queue (sa);
+       do_idle_run_queue (session);
 }
 
 static void
 got_connection (SoupConnection *conn, guint status, gpointer user_data)
 {
-       SoupSessionAsync *sa = user_data;
+       SoupSession *session = user_data;
 
        if (status == SOUP_STATUS_OK) {
                g_signal_connect (conn, "disconnected",
-                                 G_CALLBACK (connection_closed), sa);
+                                 G_CALLBACK (connection_closed), session);
 
                /* @conn has been marked reserved by SoupSession, but
                 * we don't actually have any specific message in mind
@@ -142,8 +142,8 @@ got_connection (SoupConnection *conn, guint status, gpointer user_data)
         * there may have been messages waiting for the connection
         * count to go down.
         */
-       do_idle_run_queue (sa);
-       g_object_unref (sa);
+       do_idle_run_queue (session);
+       g_object_unref (session);
 }
 
 static gboolean
@@ -151,7 +151,7 @@ run_queue (SoupSessionAsync *sa)
 {
        SoupSession *session = SOUP_SESSION (sa);
        SoupMessageQueue *queue = soup_session_get_queue (session);
-       SoupMessageQueueIter iter;
+       SoupMessageQueueItem *item;
        SoupMessage *msg;
        SoupConnection *conn;
        gboolean try_pruning = TRUE, should_prune = FALSE;
@@ -160,9 +160,10 @@ run_queue (SoupSessionAsync *sa)
        /* FIXME: prefer CONNECTING messages */
 
  try_again:
-       for (msg = soup_message_queue_first (queue, &iter);
-            msg && !should_prune;
-            msg = soup_message_queue_next (queue, &iter)) {
+       for (item = soup_message_queue_first (queue);
+            item && !should_prune;
+            item = soup_message_queue_next (queue, item)) {
+               msg = item->msg;
 
                if (!SOUP_MESSAGE_IS_STARTING (msg) ||
                    soup_message_io_in_progress (msg))
@@ -179,6 +180,8 @@ run_queue (SoupSessionAsync *sa)
                } else
                        soup_connection_send_request (conn, msg);
        }
+       if (item)
+               soup_message_queue_item_unref (item);
 
        if (try_pruning && should_prune) {
                /* There is at least one message in the queue that
@@ -186,7 +189,7 @@ run_queue (SoupSessionAsync *sa)
                 * some other server.
                 */
                if (soup_session_try_prune_connection (session)) {
-                       try_pruning = FALSE;
+                       try_pruning = should_prune = FALSE;
                        goto try_again;
                }
        }
@@ -200,32 +203,25 @@ request_restarted (SoupMessage *req, gpointer sa)
        run_queue (sa);
 }
 
-typedef struct {
-       SoupSessionAsync *sa;
-       SoupSessionCallback callback;
-       gpointer callback_data;
-} SoupSessionAsyncQueueData;
-
 static void
 final_finished (SoupMessage *req, gpointer user_data)
 {
-       SoupSessionAsyncQueueData *saqd = user_data;
-       SoupSessionAsync *sa = saqd->sa;
+       SoupMessageQueueItem *item = user_data;
+       SoupSession *session = item->session;
+
+       g_object_ref (session);
 
-       g_object_ref (sa);
        if (!SOUP_MESSAGE_IS_STARTING (req)) {
-               g_signal_handlers_disconnect_by_func (req, final_finished, saqd);
-               if (saqd->callback) {
-                       saqd->callback ((SoupSession *)sa, req,
-                                       saqd->callback_data);
-               }
+               g_signal_handlers_disconnect_by_func (req, final_finished, item);
+               if (item->callback)
+                       item->callback (session, req, item->callback_data);
 
                g_object_unref (req);
-               g_slice_free (SoupSessionAsyncQueueData, saqd);
+               soup_message_queue_item_unref (item);
        }
 
-       do_idle_run_queue (sa);
-       g_object_unref (sa);
+       do_idle_run_queue (session);
+       g_object_unref (session);
 }
 
 static gboolean
@@ -239,14 +235,14 @@ idle_run_queue (gpointer sa)
 }
 
 static void
-do_idle_run_queue (SoupSessionAsync *sa)
+do_idle_run_queue (SoupSession *session)
 {
-       SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa);
+       SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (session);
 
        if (!priv->idle_run_queue_source) {
                priv->idle_run_queue_source = soup_add_completion (
-                       soup_session_get_async_context ((SoupSession *)sa),
-                       idle_run_queue, sa);
+                       soup_session_get_async_context (session),
+                       idle_run_queue, session);
        }
 }
 
@@ -254,21 +250,19 @@ static void
 queue_message (SoupSession *session, SoupMessage *req,
               SoupSessionCallback callback, gpointer user_data)
 {
-       SoupSessionAsync *sa = SOUP_SESSION_ASYNC (session);
-       SoupSessionAsyncQueueData *saqd;
+       SoupMessageQueueItem *item;
 
-       g_signal_connect (req, "restarted",
-                         G_CALLBACK (request_restarted), sa);
+       SOUP_SESSION_CLASS (soup_session_async_parent_class)->queue_message (session, req, callback, user_data);
+
+       item = soup_message_queue_lookup (soup_session_get_queue (session), req);
+       g_return_if_fail (item != NULL);
 
-       saqd = g_slice_new (SoupSessionAsyncQueueData);
-       saqd->sa = sa;
-       saqd->callback = callback;
-       saqd->callback_data = user_data;
+       g_signal_connect (req, "restarted",
+                         G_CALLBACK (request_restarted), session);
        g_signal_connect_after (req, "finished",
-                               G_CALLBACK (final_finished), saqd);
+                               G_CALLBACK (final_finished), item);
 
-       SOUP_SESSION_CLASS (soup_session_async_parent_class)->queue_message (session, req, callback, user_data);
-       do_idle_run_queue (sa);
+       do_idle_run_queue (session);
 }
 
 static guint
index 44e7237..9ecac2a 100644 (file)
@@ -125,61 +125,6 @@ soup_session_sync_new_with_options (const char *optname1, ...)
        return session;
 }
 
-typedef struct {
-       SoupSession *session;
-       SoupMessage *msg;
-       SoupSessionCallback callback;
-       gpointer user_data;
-} SoupSessionSyncAsyncData;
-
-static void
-async_data_free (SoupSessionSyncAsyncData *sad)
-{
-       g_object_unref (sad->session);
-       g_object_unref (sad->msg);
-       g_slice_free (SoupSessionSyncAsyncData, sad);
-}
-
-static gboolean
-queue_message_callback (gpointer data)
-{
-       SoupSessionSyncAsyncData *sad = data;
-
-       sad->callback (sad->session, sad->msg, sad->user_data);
-       async_data_free (sad);
-       return FALSE;
-}
-
-static gpointer
-queue_message_thread (gpointer data)
-{
-       SoupSessionSyncAsyncData *sad = data;
-
-       soup_session_send_message (sad->session, sad->msg);
-       if (sad->callback) {
-               soup_add_completion (soup_session_get_async_context (sad->session),
-                                    queue_message_callback, sad);
-       } else
-               async_data_free (sad);
-
-       return NULL;
-}
-
-static void
-queue_message (SoupSession *session, SoupMessage *msg,
-              SoupSessionCallback callback, gpointer user_data)
-{
-       SoupSessionSyncAsyncData *sad;
-
-       sad = g_slice_new (SoupSessionSyncAsyncData);
-       sad->session = g_object_ref (session);
-       sad->msg = g_object_ref (msg);
-       sad->callback = callback;
-       sad->user_data = user_data;
-
-       g_thread_create (queue_message_thread, sad, FALSE, NULL);
-}
-
 static SoupConnection *
 wait_for_connection (SoupSession *session, SoupMessage *msg)
 {
@@ -235,25 +180,84 @@ wait_for_connection (SoupSession *session, SoupMessage *msg)
        goto try_again;
 }
 
-static guint
-send_message (SoupSession *session, SoupMessage *msg)
+static void
+process_queue_item (SoupMessageQueueItem *item)
 {
-       SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
+       SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (item->session);
+       SoupMessage *msg = item->msg;
        SoupConnection *conn;
 
-       SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
-
        do {
                /* Get a connection */
-               conn = wait_for_connection (session, msg);
+               conn = wait_for_connection (item->session, msg);
                if (!conn)
-                       return msg->status_code;
+                       break;
 
                soup_connection_send_request (conn, msg);
                g_cond_broadcast (priv->cond);
        } while (soup_message_get_io_status (msg) != SOUP_MESSAGE_IO_STATUS_FINISHED);
 
-       return msg->status_code;
+       soup_message_queue_remove (item->queue, item);
+}
+
+static gboolean
+queue_message_callback (gpointer data)
+{
+       SoupMessageQueueItem *item = data;
+
+       item->callback (item->session, item->msg, item->callback_data);
+       g_object_unref (item->session);
+       soup_message_queue_item_unref (item);
+       return FALSE;
+}
+
+static gpointer
+queue_message_thread (gpointer data)
+{
+       SoupMessageQueueItem *item = data;
+
+       process_queue_item (item);
+       if (item->callback) {
+               soup_add_completion (soup_session_get_async_context (item->session),
+                                    queue_message_callback, item);
+       } else {
+               g_object_unref (item->session);
+               soup_message_queue_item_unref (item);
+       }
+
+       return NULL;
+}
+
+static void
+queue_message (SoupSession *session, SoupMessage *msg,
+              SoupSessionCallback callback, gpointer user_data)
+{
+       SoupMessageQueueItem *item;
+
+       SOUP_SESSION_CLASS (soup_session_sync_parent_class)->
+               queue_message (g_object_ref (session), msg, callback, user_data);
+
+       item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+       g_return_if_fail (item != NULL);
+
+       g_thread_create (queue_message_thread, item, FALSE, NULL);
+}
+
+static guint
+send_message (SoupSession *session, SoupMessage *msg)
+{
+       SoupMessageQueueItem *item;
+       guint status;
+
+       SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
+
+       item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+       g_return_val_if_fail (item != NULL, SOUP_STATUS_MALFORMED);
+
+       process_queue_item (item);
+       status = msg->status_code;
+       soup_message_queue_item_unref (item);
+       return status;
 }
 
 static void
index 3107ad1..417591c 100644 (file)
@@ -158,7 +158,7 @@ soup_session_init (SoupSession *session)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 
-       priv->queue = soup_message_queue_new ();
+       priv->queue = soup_message_queue_new (session);
 
        priv->host_lock = g_mutex_new ();
        priv->hosts = g_hash_table_new (soup_uri_host_hash,
@@ -920,7 +920,7 @@ connect_result (SoupConnection *conn, guint status, gpointer user_data)
        SoupSession *session = user_data;
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
        SoupSessionHost *host;
-       SoupMessageQueueIter iter;
+       SoupMessageQueueItem *item;
        SoupMessage *msg;
 
        g_mutex_lock (priv->host_lock);
@@ -961,7 +961,8 @@ connect_result (SoupConnection *conn, guint status, gpointer user_data)
         * of luck.
         */
        g_object_ref (session);
-       for (msg = soup_message_queue_first (priv->queue, &iter); msg; msg = soup_message_queue_next (priv->queue, &iter)) {
+       for (item = soup_message_queue_first (priv->queue); item; item = soup_message_queue_next (priv->queue, item)) {
+               msg = item->msg;
                if (get_host_for_message (session, msg) == host) {
                        if (status == SOUP_STATUS_TRY_AGAIN) {
                                if (soup_message_get_io_status (msg) == SOUP_MESSAGE_IO_STATUS_CONNECTING)
@@ -1101,14 +1102,16 @@ soup_session_get_queue (SoupSession *session)
 static void
 message_finished (SoupMessage *msg, gpointer user_data)
 {
-       SoupSession *session = user_data;
+       SoupMessageQueueItem *item = user_data;
+       SoupSession *session = item->session;
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 
        if (!SOUP_MESSAGE_IS_STARTING (msg)) {
-               soup_message_queue_remove_message (priv->queue, msg);
+               soup_message_queue_remove (priv->queue, item);
                g_signal_handlers_disconnect_by_func (msg, message_finished, session);
                g_signal_handlers_disconnect_by_func (msg, redirect_handler, session);
                g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, msg);
+               soup_message_queue_item_unref (item);
        }
 }
 
@@ -1117,9 +1120,13 @@ queue_message (SoupSession *session, SoupMessage *msg,
               SoupSessionCallback callback, gpointer user_data)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       SoupMessageQueueItem *item;
+
+       item = soup_message_queue_append (priv->queue, msg, callback, user_data);
+       soup_message_set_io_status (msg, SOUP_MESSAGE_IO_STATUS_QUEUED);
 
        g_signal_connect_after (msg, "finished",
-                               G_CALLBACK (message_finished), session);
+                               G_CALLBACK (message_finished), item);
 
        if (!(soup_message_get_flags (msg) & SOUP_MESSAGE_NO_REDIRECT)) {
                soup_message_add_header_handler (
@@ -1127,9 +1134,6 @@ queue_message (SoupSession *session, SoupMessage *msg,
                        G_CALLBACK (redirect_handler), session);
        }
 
-       soup_message_set_io_status (msg, SOUP_MESSAGE_IO_STATUS_QUEUED);
-       soup_message_queue_append (priv->queue, msg);
-
        g_signal_emit (session, signals[REQUEST_QUEUED], 0, msg);
 }
 
@@ -1263,8 +1267,14 @@ static void
 cancel_message (SoupSession *session, SoupMessage *msg, guint status_code)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       SoupMessageQueueItem *item;
+
+       item = soup_message_queue_lookup (priv->queue, msg);
+       if (item) {
+               soup_message_queue_remove (priv->queue, item);
+               soup_message_queue_item_unref (item);
+       }
 
-       soup_message_queue_remove_message (priv->queue, msg);
        soup_message_io_stop (msg);
        soup_message_set_status (msg, status_code);
        soup_message_finished (msg);
@@ -1297,7 +1307,7 @@ gather_conns (gpointer key, gpointer host, gpointer data)
        SoupConnection *conn = key;
        GSList **conns = data;
 
-       *conns = g_slist_prepend (*conns, conn);
+       *conns = g_slist_prepend (*conns, g_object_ref (conn));
 }
 
 /**
@@ -1310,17 +1320,16 @@ void
 soup_session_abort (SoupSession *session)
 {
        SoupSessionPrivate *priv;
-       SoupMessageQueueIter iter;
-       SoupMessage *msg;
+       SoupMessageQueueItem *item;
        GSList *conns, *c;
 
        g_return_if_fail (SOUP_IS_SESSION (session));
        priv = SOUP_SESSION_GET_PRIVATE (session);
 
-       for (msg = soup_message_queue_first (priv->queue, &iter);
-            msg;
-            msg = soup_message_queue_next (priv->queue, &iter)) {
-               soup_session_cancel_message (session, msg,
+       for (item = soup_message_queue_first (priv->queue);
+            item;
+            item = soup_message_queue_next (priv->queue, item)) {
+               soup_session_cancel_message (session, item->msg,
                                             SOUP_STATUS_CANCELLED);
        }
 
@@ -1329,8 +1338,6 @@ soup_session_abort (SoupSession *session)
        conns = NULL;
        g_hash_table_foreach (priv->conns, gather_conns, &conns);
 
-       for (c = conns; c; c = c->next)
-               g_object_ref (c->data);
        g_mutex_unlock (priv->host_lock);
        for (c = conns; c; c = c->next) {
                soup_connection_disconnect (c->data);