+2003-12-22 Dan Winship <danw@ximian.com>
+
+ * libsoup/soup-socket.c: Lots of thread-safety stuff, primarly so
+ you can disconnect a socket from one thread while doing I/O in
+ another.
+
+ * libsoup/soup-message-io.c (soup_message_io_cancel): Split into
+ soup_message_io_stop() and io_cleanup(), to separate out the "stop
+ reading/writing" and "free data" phases to allow thread-safe
+ synchronous cancellation.
+ (soup_message_io_finished): call both soup_message_io_stop() and
+ io_cleanup()
+ (io_error): Only set SOUP_STATUS_IO_ERROR on the message if it
+ doesn't already have a transport error status (eg, CANCELLED).
+ (new_iostate): Call io_cleanup() if needed.
+
+ * libsoup/soup-status.h: add "SOUP_STATUS_NONE" for 0, to make it
+ clearer that it's not a status.
+
+ * libsoup/soup-message.c (finalize, restarted, finished,
+ soup_message_set_uri): s/soup_message_io_cancel/soup_message_io_stop/
+ (soup_message_cleanup_response): s/0/SOUP_STATUS_NONE/
+
+ * libsoup/soup-connection.c (send_request): Remove
+ soup_message_io_cancel call.
+
+ * libsoup/soup-session-sync.c (send_message): Connect to the
+ connection's "disconnected" signal rather than using a weak ref,
+ since that's what we really care about, and it's possible that the
+ connection may have an extra ref on it somewhere that would keep
+ it from being destroyed even if it was disconnected.
+
2003-12-20 Joe Shaw <joe@ximian.com>
* libsoup/soup-session.c (lookup_auth): If const_path is NULL un
#define RESPONSE_BLOCK_SIZE 8192
-void
-soup_message_io_cancel (SoupMessage *msg)
+static void
+io_cleanup (SoupMessage *msg)
{
SoupMessageIOData *io = msg->priv->io_data;
if (!io)
return;
- if (io->read_tag)
- g_signal_handler_disconnect (io->sock, io->read_tag);
- if (io->write_tag)
- g_signal_handler_disconnect (io->sock, io->write_tag);
- if (io->err_tag)
- g_signal_handler_disconnect (io->sock, io->err_tag);
-
- if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE)
- soup_socket_disconnect (io->sock);
- g_object_unref (io->sock);
+ if (io->sock)
+ g_object_unref (io->sock);
if (io->read_buf)
g_byte_array_free (io->read_buf, TRUE);
msg->priv->io_data = NULL;
}
+void
+soup_message_io_stop (SoupMessage *msg)
+{
+ SoupMessageIOData *io = msg->priv->io_data;
+
+ if (!io)
+ return;
+
+ if (io->read_tag) {
+ g_signal_handler_disconnect (io->sock, io->read_tag);
+ io->read_tag = 0;
+ }
+ if (io->write_tag) {
+ g_signal_handler_disconnect (io->sock, io->write_tag);
+ io->write_tag = 0;
+ }
+ if (io->err_tag) {
+ g_signal_handler_disconnect (io->sock, io->err_tag);
+ io->err_tag = 0;
+ }
+
+ if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE)
+ soup_socket_disconnect (io->sock);
+}
+
#define SOUP_MESSAGE_IO_EOL "\r\n"
#define SOUP_MESSAGE_IO_EOL_LEN 2
#define SOUP_MESSAGE_IO_DOUBLE_EOL "\r\n\r\n"
soup_message_io_finished (SoupMessage *msg)
{
g_object_ref (msg);
- soup_message_io_cancel (msg);
+ soup_message_io_stop (msg);
+ io_cleanup (msg);
if (SOUP_MESSAGE_IS_STARTING (msg))
soup_message_restarted (msg);
else
return;
}
- soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
+ if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
+ soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
soup_message_io_finished (msg);
}
io->read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
+ if (msg->priv->io_data)
+ io_cleanup (msg);
msg->priv->io_data = io;
return io;
}
guint watch;
guint read_tag, write_tag, error_tag;
GByteArray *read_buf;
+
+ GMutex *iolock, *addrlock;
};
#ifdef HAVE_IPV6
sock->priv->sockfd = -1;
sock->priv->non_blocking = sock->priv->nodelay = TRUE;
sock->priv->reuseaddr = TRUE;
+ sock->priv->addrlock = g_mutex_new ();
+ sock->priv->iolock = g_mutex_new ();
}
-static gboolean
+static void
disconnect_internal (SoupSocket *sock)
{
- GIOChannel *iochannel;
-
- /* If we close the socket from one thread while
- * reading/writing from another, it's possible that the other
- * thread will get an I/O error and try to close the socket
- * while we're still in this function. So we clear
- * sock->priv->iochannel early to make sure that the other
- * thread's attempt to close the socket becomes a no-op.
- */
- iochannel = sock->priv->iochannel;
+ g_io_channel_unref (sock->priv->iochannel);
sock->priv->iochannel = NULL;
- if (iochannel == NULL)
- return FALSE;
-
- g_io_channel_unref (iochannel);
+ sock->priv->sockfd = -1;
if (sock->priv->read_tag) {
g_source_remove (sock->priv->read_tag);
g_source_remove (sock->priv->error_tag);
sock->priv->error_tag = 0;
}
-
- return TRUE;
}
static void
if (sock->priv->watch)
g_source_remove (sock->priv->watch);
+ g_mutex_free (sock->priv->addrlock);
+ g_mutex_free (sock->priv->iolock);
+
g_free (sock->priv);
G_OBJECT_CLASS (parent_class)->finalize (object);
{
int flags, opt;
- if (!sock->priv->sockfd)
+ if (sock->priv->sockfd == -1)
return;
flags = fcntl (sock->priv->sockfd, F_GETFL, 0);
static GIOChannel *
get_iochannel (SoupSocket *sock)
{
+ g_mutex_lock (sock->priv->iolock);
if (!sock->priv->iochannel) {
sock->priv->iochannel =
g_io_channel_unix_new (sock->priv->sockfd);
g_io_channel_set_encoding (sock->priv->iochannel, NULL, NULL);
g_io_channel_set_buffered (sock->priv->iochannel, FALSE);
}
+ g_mutex_unlock (sock->priv->iolock);
return sock->priv->iochannel;
}
void
soup_socket_disconnect (SoupSocket *sock)
{
+ gboolean already_disconnected = FALSE;
+
g_return_if_fail (SOUP_IS_SOCKET (sock));
- if (!disconnect_internal (sock))
+ if (g_mutex_trylock (sock->priv->iolock)) {
+ if (sock->priv->iochannel)
+ disconnect_internal (sock);
+ else
+ already_disconnected = TRUE;
+ g_mutex_unlock (sock->priv->iolock);
+ } else {
+ int sockfd;
+
+ /* Another thread is currently doing IO, so
+ * we can't close the iochannel. So just kick
+ * the file descriptor out from under it.
+ */
+
+ sockfd = sock->priv->sockfd;
+ sock->priv->sockfd = -1;
+ if (sockfd == -1)
+ already_disconnected = TRUE;
+ else {
+ g_io_channel_set_close_on_unref (sock->priv->iochannel,
+ FALSE);
+ close (sockfd);
+ }
+ }
+
+ if (already_disconnected)
return;
/* Give all readers a chance to notice the connection close */
{
g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+ g_mutex_lock (sock->priv->addrlock);
if (!sock->priv->local_addr) {
struct soup_sockaddr_max bound_sa;
int sa_len;
getsockname (sock->priv->sockfd, (struct sockaddr *)&bound_sa, &sa_len);
sock->priv->local_addr = soup_address_new_from_sockaddr ((struct sockaddr *)&bound_sa, sa_len);
}
+ g_mutex_unlock (sock->priv->addrlock);
return sock->priv->local_addr;
}
{
g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
- if (!sock->priv->local_addr) {
+ g_mutex_lock (sock->priv->addrlock);
+ if (!sock->priv->remote_addr) {
struct soup_sockaddr_max bound_sa;
int sa_len;
getpeername (sock->priv->sockfd, (struct sockaddr *)&bound_sa, &sa_len);
sock->priv->remote_addr = soup_address_new_from_sockaddr ((struct sockaddr *)&bound_sa, sa_len);
}
+ g_mutex_unlock (sock->priv->addrlock);
return sock->priv->remote_addr;
}
{
GIOStatus status;
- if (!sock->priv->iochannel)
+ if (!sock->priv->iochannel)
return SOUP_SOCKET_EOF;
status = g_io_channel_read_chars (sock->priv->iochannel,
SoupSocketIOStatus
soup_socket_read (SoupSocket *sock, gpointer buffer, gsize len, gsize *nread)
{
+ SoupSocketIOStatus status;
+
g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
+ g_mutex_lock (sock->priv->iolock);
if (sock->priv->read_buf)
- return read_from_buf (sock, buffer, len, nread);
+ status = read_from_buf (sock, buffer, len, nread);
else
- return read_from_network (sock, buffer, len, nread);
+ status = read_from_network (sock, buffer, len, nread);
+ g_mutex_unlock (sock->priv->iolock);
+
+ return status;
}
/**
g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
g_return_val_if_fail (len >= boundary_len, SOUP_SOCKET_ERROR);
+ g_mutex_lock (sock->priv->iolock);
+
*got_boundary = FALSE;
if (!sock->priv->read_buf)
len - prev_len, nread);
read_buf->len = prev_len + *nread;
- if (status != SOUP_SOCKET_OK)
+ if (status != SOUP_SOCKET_OK) {
+ g_mutex_unlock (sock->priv->iolock);
return status;
+ }
}
/* Scan for the boundary */
* buffer).
*/
match_len = p - read_buf->data;
- return read_from_buf (sock, buffer, MIN (len, match_len), nread);
+ status = read_from_buf (sock, buffer, MIN (len, match_len), nread);
+
+ g_mutex_unlock (sock->priv->iolock);
+ return status;
}
static gboolean
g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
- if (!sock->priv->iochannel)
+ g_mutex_lock (sock->priv->iolock);
+
+ if (!sock->priv->iochannel) {
+ g_mutex_unlock (sock->priv->iolock);
return SOUP_SOCKET_EOF;
- if (sock->priv->write_tag)
+ }
+ if (sock->priv->write_tag) {
+ g_mutex_unlock (sock->priv->iolock);
return SOUP_SOCKET_WOULD_BLOCK;
+ }
pipe_handler = signal (SIGPIPE, SIG_IGN);
status = g_io_channel_write_chars (sock->priv->iochannel,
buffer, len, nwrote, NULL);
signal (SIGPIPE, pipe_handler);
- if (status != G_IO_STATUS_NORMAL && status != G_IO_STATUS_AGAIN)
+ if (status != G_IO_STATUS_NORMAL && status != G_IO_STATUS_AGAIN) {
+ g_mutex_unlock (sock->priv->iolock);
return SOUP_SOCKET_ERROR;
+ }
- if (*nwrote)
+ if (*nwrote) {
+ g_mutex_unlock (sock->priv->iolock);
return SOUP_SOCKET_OK;
+ }
sock->priv->write_tag =
g_io_add_watch (sock->priv->iochannel, G_IO_OUT,
socket_write_watch, sock);
+ g_mutex_unlock (sock->priv->iolock);
return SOUP_SOCKET_WOULD_BLOCK;
}