Lots of thread-safety stuff, primarly so you can disconnect a socket from
authorDan Winship <danw@src.gnome.org>
Mon, 22 Dec 2003 18:09:34 +0000 (18:09 +0000)
committerDan Winship <danw@src.gnome.org>
Mon, 22 Dec 2003 18:09:34 +0000 (18:09 +0000)
* libsoup/soup-socket.c: Lots of thread-safety stuff, primarly so
you can disconnect a socket from one thread while doing I/O in
another.

* libsoup/soup-message-io.c (soup_message_io_cancel): Split into
soup_message_io_stop() and io_cleanup(), to separate out the "stop
reading/writing" and "free data" phases to allow thread-safe
synchronous cancellation.
(soup_message_io_finished): call both soup_message_io_stop() and
io_cleanup()
(io_error): Only set SOUP_STATUS_IO_ERROR on the message if it
doesn't already have a transport error status (eg, CANCELLED).
(new_iostate): Call io_cleanup() if needed.

* libsoup/soup-status.h: add "SOUP_STATUS_NONE" for 0, to make it
clearer that it's not a status.

* libsoup/soup-message.c (finalize, restarted, finished,
soup_message_set_uri): s/soup_message_io_cancel/soup_message_io_stop/
(soup_message_cleanup_response): s/0/SOUP_STATUS_NONE/

* libsoup/soup-connection.c (send_request): Remove
soup_message_io_cancel call.

* libsoup/soup-session-sync.c (send_message): Connect to the
connection's "disconnected" signal rather than using a weak ref,
since that's what we really care about, and it's possible that the
connection may have an extra ref on it somewhere that would keep
it from being destroyed even if it was disconnected.

ChangeLog
libsoup/soup-connection.c
libsoup/soup-message-io.c
libsoup/soup-message.c
libsoup/soup-message.h
libsoup/soup-session-sync.c
libsoup/soup-socket.c
libsoup/soup-status.h

index eb4fd09..3c53b45 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,35 @@
+2003-12-22  Dan Winship  <danw@ximian.com>
+
+       * libsoup/soup-socket.c: Lots of thread-safety stuff, primarly so
+       you can disconnect a socket from one thread while doing I/O in
+       another.
+
+       * libsoup/soup-message-io.c (soup_message_io_cancel): Split into
+       soup_message_io_stop() and io_cleanup(), to separate out the "stop
+       reading/writing" and "free data" phases to allow thread-safe
+       synchronous cancellation.
+       (soup_message_io_finished): call both soup_message_io_stop() and
+       io_cleanup()
+       (io_error): Only set SOUP_STATUS_IO_ERROR on the message if it
+       doesn't already have a transport error status (eg, CANCELLED).
+       (new_iostate): Call io_cleanup() if needed.
+
+       * libsoup/soup-status.h: add "SOUP_STATUS_NONE" for 0, to make it
+       clearer that it's not a status.
+
+       * libsoup/soup-message.c (finalize, restarted, finished,
+       soup_message_set_uri): s/soup_message_io_cancel/soup_message_io_stop/
+       (soup_message_cleanup_response): s/0/SOUP_STATUS_NONE/
+
+       * libsoup/soup-connection.c (send_request): Remove
+       soup_message_io_cancel call.
+
+       * libsoup/soup-session-sync.c (send_message): Connect to the
+       connection's "disconnected" signal rather than using a weak ref,
+       since that's what we really care about, and it's possible that the
+       connection may have an extra ref on it somewhere that would keep
+       it from being destroyed even if it was disconnected.
+
 2003-12-20  Joe Shaw  <joe@ximian.com>
 
        * libsoup/soup-session.c (lookup_auth): If const_path is NULL un
index 5e3d3bd..d2ca402 100644 (file)
@@ -625,7 +625,6 @@ send_request (SoupConnection *conn, SoupMessage *req)
                        soup_message_filter_setup_message (conn->priv->filter, req);
        }
 
-       soup_message_io_cancel (req);
        soup_message_send_request (req, conn->priv->socket,
                                   conn->priv->proxy_uri != NULL);
 }
index 74c8c03..dc6e30a 100644 (file)
@@ -68,24 +68,16 @@ typedef struct {
 
 #define RESPONSE_BLOCK_SIZE 8192
 
-void
-soup_message_io_cancel (SoupMessage *msg)
+static void
+io_cleanup (SoupMessage *msg)
 {
        SoupMessageIOData *io = msg->priv->io_data;
 
        if (!io)
                return;
 
-       if (io->read_tag)
-               g_signal_handler_disconnect (io->sock, io->read_tag);
-       if (io->write_tag)
-               g_signal_handler_disconnect (io->sock, io->write_tag);
-       if (io->err_tag)
-               g_signal_handler_disconnect (io->sock, io->err_tag);
-
-       if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE)
-               soup_socket_disconnect (io->sock);
-       g_object_unref (io->sock);
+       if (io->sock)
+               g_object_unref (io->sock);
 
        if (io->read_buf)
                g_byte_array_free (io->read_buf, TRUE);
@@ -97,6 +89,31 @@ soup_message_io_cancel (SoupMessage *msg)
        msg->priv->io_data = NULL;
 }
 
+void
+soup_message_io_stop (SoupMessage *msg)
+{
+       SoupMessageIOData *io = msg->priv->io_data;
+
+       if (!io)
+               return;
+
+       if (io->read_tag) {
+               g_signal_handler_disconnect (io->sock, io->read_tag);
+               io->read_tag = 0;
+       }
+       if (io->write_tag) {
+               g_signal_handler_disconnect (io->sock, io->write_tag);
+               io->write_tag = 0;
+       }
+       if (io->err_tag) {
+               g_signal_handler_disconnect (io->sock, io->err_tag);
+               io->err_tag = 0;
+       }
+
+       if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE)
+               soup_socket_disconnect (io->sock);
+}
+
 #define SOUP_MESSAGE_IO_EOL            "\r\n"
 #define SOUP_MESSAGE_IO_EOL_LEN        2
 #define SOUP_MESSAGE_IO_DOUBLE_EOL     "\r\n\r\n"
@@ -106,7 +123,8 @@ static void
 soup_message_io_finished (SoupMessage *msg)
 {
        g_object_ref (msg);
-       soup_message_io_cancel (msg);
+       soup_message_io_stop (msg);
+       io_cleanup (msg);
        if (SOUP_MESSAGE_IS_STARTING (msg))
                soup_message_restarted (msg);
        else
@@ -128,7 +146,8 @@ io_error (SoupSocket *sock, SoupMessage *msg)
                return;
        }
 
-       soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
+       if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
+               soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
        soup_message_io_finished (msg);
 }
 
@@ -648,6 +667,8 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
        io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
        io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
 
+       if (msg->priv->io_data)
+               io_cleanup (msg);
        msg->priv->io_data = io;
        return io;
 }
index 7fb4bfd..da7366b 100644 (file)
@@ -67,7 +67,7 @@ finalize (GObject *object)
 {
        SoupMessage *msg = SOUP_MESSAGE (object);
 
-       soup_message_io_cancel (msg);
+       soup_message_io_stop (msg);
 
        if (msg->priv->uri)
                soup_uri_free (msg->priv->uri);
@@ -393,7 +393,7 @@ soup_message_got_body (SoupMessage *msg)
 static void
 restarted (SoupMessage *req)
 {
-       soup_message_io_cancel (req);
+       soup_message_io_stop (req);
 }
 
 void
@@ -405,7 +405,7 @@ soup_message_restarted (SoupMessage *msg)
 static void
 finished (SoupMessage *req)
 {
-       soup_message_io_cancel (req);
+       soup_message_io_stop (req);
        req->status = SOUP_MESSAGE_STATUS_FINISHED;
 }
 
@@ -555,7 +555,7 @@ soup_message_cleanup_response (SoupMessage *req)
 
        soup_message_clear_headers (req->response_headers);
 
-       req->status_code = 0;
+       req->status_code = SOUP_STATUS_NONE;
        if (req->reason_phrase) {
                g_free ((char *) req->reason_phrase);
                req->reason_phrase = NULL;
@@ -637,9 +637,9 @@ soup_message_set_uri (SoupMessage *msg, const SoupUri *new_uri)
 
        if (msg->priv->uri && new_uri) {
                if (strcmp (msg->priv->uri->host, new_uri->host) != 0)
-                       soup_message_io_cancel (msg);
+                       soup_message_io_stop (msg);
        } else if (!new_uri)
-               soup_message_io_cancel (msg);
+               soup_message_io_stop (msg);
 
        if (msg->priv->uri)
                soup_uri_free (msg->priv->uri);
index f72fb7a..a248e43 100644 (file)
@@ -250,7 +250,7 @@ void           soup_message_send_request        (SoupMessage       *req,
                                                 gboolean           via_proxy);
 void           soup_message_read_request        (SoupMessage       *req,
                                                 SoupSocket        *sock);
-void           soup_message_io_cancel           (SoupMessage       *msg);
+void           soup_message_io_stop             (SoupMessage       *msg);
 void           soup_message_io_pause            (SoupMessage       *msg);
 void           soup_message_io_unpause          (SoupMessage       *msg);
 
index 6311073..519f2e0 100644 (file)
@@ -146,6 +146,15 @@ wait_for_connection (SoupSession *session, SoupMessage *msg)
        goto try_again;
 }
 
+static void
+connection_disconnected (SoupConnection *conn, gpointer user_data)
+{
+       SoupConnection **conn_p = user_data;
+
+       g_signal_handlers_disconnect_by_func (conn, connection_disconnected, conn_p);
+       *conn_p = NULL;
+}
+
 static guint
 send_message (SoupSession *session, SoupMessage *msg)
 {
@@ -160,11 +169,8 @@ send_message (SoupSession *session, SoupMessage *msg)
                if (!conn)
                        return msg->status_code;
 
-               /* Set up a weak pointer so that "conn" is zeroed out
-                * if the connection is destroyed.
-                */
-               g_object_add_weak_pointer (G_OBJECT (conn),
-                                          (gpointer *)&conn);
+               g_signal_connect (conn, "disconnected",
+                                 G_CALLBACK (connection_disconnected), &conn);
 
                /* Now repeatedly send the message across the connection
                 * until either it's done, or the connection is closed.
@@ -173,8 +179,7 @@ send_message (SoupSession *session, SoupMessage *msg)
                        soup_connection_send_request (conn, msg);
 
                if (conn) {
-                       g_object_remove_weak_pointer (G_OBJECT (conn),
-                                                     (gpointer *)&conn);
+                       g_signal_handlers_disconnect_by_func (conn, connection_disconnected, &conn);
                }
 
                /* If the message isn't finished, that means we need to
index ece538f..011e20d 100644 (file)
@@ -66,6 +66,8 @@ struct SoupSocketPrivate {
        guint           watch;
        guint           read_tag, write_tag, error_tag;
        GByteArray     *read_buf;
+
+       GMutex *iolock, *addrlock;
 };
 
 #ifdef HAVE_IPV6
@@ -88,26 +90,16 @@ init (GObject *object)
        sock->priv->sockfd = -1;
        sock->priv->non_blocking = sock->priv->nodelay = TRUE;
        sock->priv->reuseaddr = TRUE;
+       sock->priv->addrlock = g_mutex_new ();
+       sock->priv->iolock = g_mutex_new ();
 }
 
-static gboolean
+static void
 disconnect_internal (SoupSocket *sock)
 {
-       GIOChannel *iochannel;
-
-       /* If we close the socket from one thread while
-        * reading/writing from another, it's possible that the other
-        * thread will get an I/O error and try to close the socket
-        * while we're still in this function. So we clear
-        * sock->priv->iochannel early to make sure that the other
-        * thread's attempt to close the socket becomes a no-op.
-        */
-       iochannel = sock->priv->iochannel;
+       g_io_channel_unref (sock->priv->iochannel);
        sock->priv->iochannel = NULL;
-       if (iochannel == NULL)
-               return FALSE;
-
-       g_io_channel_unref (iochannel);
+       sock->priv->sockfd = -1;
 
        if (sock->priv->read_tag) {
                g_source_remove (sock->priv->read_tag);
@@ -121,8 +113,6 @@ disconnect_internal (SoupSocket *sock)
                g_source_remove (sock->priv->error_tag);
                sock->priv->error_tag = 0;
        }
-
-       return TRUE;
 }
 
 static void
@@ -141,6 +131,9 @@ finalize (GObject *object)
        if (sock->priv->watch)
                g_source_remove (sock->priv->watch);
 
+       g_mutex_free (sock->priv->addrlock);
+       g_mutex_free (sock->priv->iolock);
+
        g_free (sock->priv);
 
        G_OBJECT_CLASS (parent_class)->finalize (object);
@@ -245,7 +238,7 @@ update_fdflags (SoupSocket *sock)
 {
        int flags, opt;
 
-       if (!sock->priv->sockfd)
+       if (sock->priv->sockfd == -1)
                return;
 
        flags = fcntl (sock->priv->sockfd, F_GETFL, 0);
@@ -345,6 +338,7 @@ soup_socket_new (const char *optname1, ...)
 static GIOChannel *
 get_iochannel (SoupSocket *sock)
 {
+       g_mutex_lock (sock->priv->iolock);
        if (!sock->priv->iochannel) {
                sock->priv->iochannel =
                        g_io_channel_unix_new (sock->priv->sockfd);
@@ -352,6 +346,7 @@ get_iochannel (SoupSocket *sock)
                g_io_channel_set_encoding (sock->priv->iochannel, NULL, NULL);
                g_io_channel_set_buffered (sock->priv->iochannel, FALSE);
        }
+       g_mutex_unlock (sock->priv->iolock);
        return sock->priv->iochannel;
 }
 
@@ -738,9 +733,36 @@ soup_socket_server_new (SoupAddress *local_addr, gpointer ssl_creds,
 void
 soup_socket_disconnect (SoupSocket *sock)
 {
+       gboolean already_disconnected = FALSE;
+
        g_return_if_fail (SOUP_IS_SOCKET (sock));
 
-       if (!disconnect_internal (sock))
+       if (g_mutex_trylock (sock->priv->iolock)) {
+               if (sock->priv->iochannel)
+                       disconnect_internal (sock);
+               else
+                       already_disconnected = TRUE;
+               g_mutex_unlock (sock->priv->iolock);
+       } else {
+               int sockfd;
+
+               /* Another thread is currently doing IO, so
+                * we can't close the iochannel. So just kick
+                * the file descriptor out from under it.
+                */
+
+               sockfd = sock->priv->sockfd;
+               sock->priv->sockfd = -1;
+               if (sockfd == -1)
+                       already_disconnected = TRUE;
+               else {
+                       g_io_channel_set_close_on_unref (sock->priv->iochannel,
+                                                        FALSE);
+                       close (sockfd);
+               }
+       }
+
+       if (already_disconnected)
                return;
 
        /* Give all readers a chance to notice the connection close */
@@ -766,6 +788,7 @@ soup_socket_get_local_address (SoupSocket *sock)
 {
        g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
 
+       g_mutex_lock (sock->priv->addrlock);
        if (!sock->priv->local_addr) {
                struct soup_sockaddr_max bound_sa;
                int sa_len;
@@ -774,6 +797,7 @@ soup_socket_get_local_address (SoupSocket *sock)
                getsockname (sock->priv->sockfd, (struct sockaddr *)&bound_sa, &sa_len);
                sock->priv->local_addr = soup_address_new_from_sockaddr ((struct sockaddr *)&bound_sa, sa_len);
        }
+       g_mutex_unlock (sock->priv->addrlock);
 
        return sock->priv->local_addr;
 }
@@ -783,7 +807,8 @@ soup_socket_get_remote_address (SoupSocket *sock)
 {
        g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
 
-       if (!sock->priv->local_addr) {
+       g_mutex_lock (sock->priv->addrlock);
+       if (!sock->priv->remote_addr) {
                struct soup_sockaddr_max bound_sa;
                int sa_len;
 
@@ -791,6 +816,7 @@ soup_socket_get_remote_address (SoupSocket *sock)
                getpeername (sock->priv->sockfd, (struct sockaddr *)&bound_sa, &sa_len);
                sock->priv->remote_addr = soup_address_new_from_sockaddr ((struct sockaddr *)&bound_sa, sa_len);
        }
+       g_mutex_unlock (sock->priv->addrlock);
 
        return sock->priv->remote_addr;
 }
@@ -814,7 +840,7 @@ read_from_network (SoupSocket *sock, gpointer buffer, gsize len, gsize *nread)
 {
        GIOStatus status;
 
-       if (!sock->priv->iochannel)
+       if (!sock->priv->iochannel) 
                return SOUP_SOCKET_EOF;
 
        status = g_io_channel_read_chars (sock->priv->iochannel,
@@ -886,12 +912,18 @@ read_from_buf (SoupSocket *sock, gpointer buffer, gsize len, gsize *nread)
 SoupSocketIOStatus
 soup_socket_read (SoupSocket *sock, gpointer buffer, gsize len, gsize *nread)
 {
+       SoupSocketIOStatus status;
+
        g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
 
+       g_mutex_lock (sock->priv->iolock);
        if (sock->priv->read_buf)
-               return read_from_buf (sock, buffer, len, nread);
+               status = read_from_buf (sock, buffer, len, nread);
        else
-               return read_from_network (sock, buffer, len, nread);
+               status = read_from_network (sock, buffer, len, nread);
+       g_mutex_unlock (sock->priv->iolock);
+
+       return status;
 }
 
 /**
@@ -925,6 +957,8 @@ soup_socket_read_until (SoupSocket *sock, gpointer buffer, gsize len,
        g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
        g_return_val_if_fail (len >= boundary_len, SOUP_SOCKET_ERROR);
 
+       g_mutex_lock (sock->priv->iolock);
+
        *got_boundary = FALSE;
 
        if (!sock->priv->read_buf)
@@ -939,8 +973,10 @@ soup_socket_read_until (SoupSocket *sock, gpointer buffer, gsize len,
                                            len - prev_len, nread);
                read_buf->len = prev_len + *nread;
 
-               if (status != SOUP_SOCKET_OK)
+               if (status != SOUP_SOCKET_OK) {
+                       g_mutex_unlock (sock->priv->iolock);
                        return status;
+               }
        }
 
        /* Scan for the boundary */
@@ -958,7 +994,10 @@ soup_socket_read_until (SoupSocket *sock, gpointer buffer, gsize len,
         * buffer).
         */
        match_len = p - read_buf->data;
-       return read_from_buf (sock, buffer, MIN (len, match_len), nread);
+       status = read_from_buf (sock, buffer, MIN (len, match_len), nread);
+
+       g_mutex_unlock (sock->priv->iolock);
+       return status;
 }
 
 static gboolean
@@ -1002,23 +1041,34 @@ soup_socket_write (SoupSocket *sock, gconstpointer buffer,
 
        g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
 
-       if (!sock->priv->iochannel)
+       g_mutex_lock (sock->priv->iolock);
+
+       if (!sock->priv->iochannel) {
+               g_mutex_unlock (sock->priv->iolock);
                return SOUP_SOCKET_EOF;
-       if (sock->priv->write_tag)
+       }
+       if (sock->priv->write_tag) {
+               g_mutex_unlock (sock->priv->iolock);
                return SOUP_SOCKET_WOULD_BLOCK;
+       }
 
        pipe_handler = signal (SIGPIPE, SIG_IGN);
        status = g_io_channel_write_chars (sock->priv->iochannel,
                                           buffer, len, nwrote, NULL);
        signal (SIGPIPE, pipe_handler);
-       if (status != G_IO_STATUS_NORMAL && status != G_IO_STATUS_AGAIN)
+       if (status != G_IO_STATUS_NORMAL && status != G_IO_STATUS_AGAIN) {
+               g_mutex_unlock (sock->priv->iolock);
                return SOUP_SOCKET_ERROR;
+       }
 
-       if (*nwrote)
+       if (*nwrote) {
+               g_mutex_unlock (sock->priv->iolock);
                return SOUP_SOCKET_OK;
+       }
 
        sock->priv->write_tag =
                g_io_add_watch (sock->priv->iochannel, G_IO_OUT,
                                socket_write_watch, sock);
+       g_mutex_unlock (sock->priv->iolock);
        return SOUP_SOCKET_WOULD_BLOCK;
 }
index d4d643b..f0154ab 100644 (file)
@@ -25,8 +25,10 @@ typedef enum {
 #define SOUP_STATUS_IS_SERVER_ERROR(x)    ((x) >= 500 && (x) < 600)
 
 typedef enum {
+       SOUP_STATUS_NONE,
+
        /* Transport Errors */
-       SOUP_STATUS_CANCELLED                = 1,
+       SOUP_STATUS_CANCELLED                       = 1,
        SOUP_STATUS_CANT_RESOLVE,
        SOUP_STATUS_CANT_RESOLVE_PROXY,
        SOUP_STATUS_CANT_CONNECT,