Add locks to input/output streams
authorChenthill Palanisamy <pchenthill@novell.com>
Tue, 23 Feb 2010 00:56:31 +0000 (06:26 +0530)
committerChenthill Palanisamy <pchenthill@novell.com>
Wed, 24 Feb 2010 13:56:21 +0000 (19:26 +0530)
camel/providers/imapx/camel-imapx-server.c
camel/providers/imapx/camel-imapx-server.h

index ad50202..3eab8e1 100644 (file)
@@ -738,12 +738,8 @@ imapx_command_start (CamelIMAPXServer *imap, CamelIMAPXCommand *ic)
        gboolean ret = TRUE;
 
        camel_imapx_command_close(ic);
-
-       /* FIXME: assert the selected folder == ic->selected */
-
        cp = (CamelIMAPXCommandPart *)ic->parts.head;
        g_assert(cp->next);
-
        ic->current = cp;
 
        /* TODO: If we support literal+ we should be able to write the whole command out
@@ -754,25 +750,22 @@ imapx_command_start (CamelIMAPXServer *imap, CamelIMAPXCommand *ic)
 
        camel_dlist_addtail(&imap->active, (CamelDListNode *)ic);
 
+       g_static_rec_mutex_lock (&imap->ostream_lock);
+       
        c(printf("Staring command (active=%d,%s) %c%05u %s\r\n", camel_dlist_length(&imap->active), imap->literal?" literal":"", imap->tagprefix, ic->tag, cp->data));
        if (!imap->stream || camel_stream_printf((CamelStream *)imap->stream, "%c%05u %s\r\n", imap->tagprefix, ic->tag, cp->data) == -1) {
+               g_print ("Command start failed  \n");
                camel_exception_set (ic->ex, 1, "Command start failed");
                ret = FALSE;
-               camel_dlist_remove((CamelDListNode *)ic);
+               camel_dlist_remove ((CamelDListNode *)ic);
        }
-
+       
+       g_static_rec_mutex_unlock (&imap->ostream_lock);
+       
        return ret;
 }
 
-/* must have QUEUE lock */
-static void
-imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex)
-{
-       CamelIMAPXCommand *ic, *nc;
-       gint count = 0;
-       gint pri = -128;
-
-       /* See if we can start another task yet.
+/* See if we can start another task yet.
 
        If we're waiting for a literal, we cannot proceed.
 
@@ -787,12 +780,17 @@ imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex)
        If we dont, select the first folder required,
        then queue all the outstanding jobs on it, that
        are at least as high priority as the first.
+       
+       must have QUEUE lock */
 
-       This is very shitty code!
-       */
+static void
+imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex)
+{
+       CamelIMAPXCommand *ic, *nc;
+       gint count = 0;
+       gint pri = -128;
 
        c(printf("** Starting next command\n"));
-
        if (is->literal != NULL || is->select_pending != NULL) {
                c(printf("* no, waiting for literal/pending select '%s'\n", is->select_pending->full_name));
                return;
@@ -1138,7 +1136,7 @@ imapx_untagged(CamelIMAPXServer *imap, CamelException *ex)
                if (finfo->got & FETCH_FLAGS && !(finfo->got & FETCH_UID)) {
                        if (imap->select_folder) {
                                CamelFolder *folder;
-                               CamelMessageInfo *mi;
+                               CamelMessageInfo *mi = NULL;
                                gboolean changed = FALSE;
                                gchar *uid = NULL;
 
@@ -1366,12 +1364,16 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
 
        printf("got continuation response\n");
 
+       CAMEL_SERVICE_REC_LOCK (imap->store, connect_lock);
        /* The 'literal' pointer is like a write-lock, nothing else
           can write while we have it ... so we dont need any
           ohter lock here.  All other writes go through
           queue-lock */
        if (imapx_idle_supported (imap) && imapx_in_idle (imap)) {
                camel_imapx_stream_skip (imap->stream, ex);
+
+               CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
+               
                printf("Got continuation response for IDLE \n");
                imap->idle->started = TRUE;
 
@@ -1386,6 +1388,7 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
        ic = imap->literal;
        if (ic == NULL) {
                camel_imapx_stream_skip(imap->stream, ex);
+               CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
                printf("got continuation response with no outstanding continuation requests?\n");
                return 1;
        }
@@ -1441,6 +1444,7 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
                /* should we just ignore? */
                imap->literal = NULL;
                camel_exception_set (ex, 1, "continuation response for non-continuation request");
+               CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
                return -1;
        }
 
@@ -1460,6 +1464,8 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
                printf("%p: queueing continuation\n", ic);
                camel_stream_printf((CamelStream *)imap->stream, "\r\n");
        }
+       
+       CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
 
        QUEUE_LOCK(imap);
        imap->literal = newliteral;
@@ -1531,7 +1537,10 @@ imapx_completion(CamelIMAPXServer *imap, guchar *token, gint len, CamelException
 
        camel_dlist_remove ((CamelDListNode *) ic);
        QUEUE_UNLOCK(imap);
+       
+       CAMEL_SERVICE_REC_LOCK (imap->store, connect_lock);
        ic->status = imapx_parse_status(imap->stream, ex);
+       CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
 
        if (ic->complete)
                ic->complete (imap, ic);
@@ -1551,7 +1560,9 @@ imapx_step(CamelIMAPXServer *is, CamelException *ex)
        gint tok;
 
        // poll ?  wait for other stuff? loop?
+       CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
        tok = camel_imapx_stream_token (is->stream, &token, &len, ex);
+       CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
        if (camel_exception_is_set (ex))
                return;
 
@@ -1990,11 +2001,14 @@ imapx_connect (CamelIMAPXServer *is, gint ssl_mode, gint try_starttls, CamelExce
                        }
                        tcp_stream = camel_tcp_stream_ssl_new(is->session, is->url->host, SSL_PORT_FLAGS);
                }
+               is->is_ssl_stream = TRUE;
        } else {
                tcp_stream = camel_tcp_stream_raw_new ();
+               is->is_ssl_stream = FALSE;
        }
 #else
        tcp_stream = camel_tcp_stream_raw_new ();
+       is->is_ssl_stream = FALSE;
 #endif /* HAVE_SSL */
 
        hints.ai_socktype = SOCK_STREAM;
@@ -2428,7 +2442,6 @@ imapx_command_append_message_done (CamelIMAPXServer *is, CamelIMAPXCommand *ic)
                                                changes);
                                camel_folder_change_info_free (changes);
 
-                               camel_message_info_free(mi);
                                g_free(cur);
                        } else {
                                printf("but uidvalidity changed, uh ...\n");
@@ -3173,6 +3186,52 @@ cancel_all_jobs (CamelIMAPXServer *is, CamelException *ex)
 
 /* ********************************************************************** */
 
+static void
+parse_contents (CamelIMAPXServer *is, CamelException *ex)
+{
+       gint buffered = 0;
+
+       do {
+               imapx_step(is, ex);
+               
+               CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+               
+               buffered = camel_imapx_stream_buffered (is->stream);
+
+               CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
+
+       } while (buffered && !camel_exception_is_set (ex));
+}
+
+/*
+   The main processing (reading) loop.
+
+   Incoming requests are added as jobs and tasks from other threads,
+   we just read the results from the server continously, and match
+   them up with the queued tasks as they come back.
+
+   Of course this loop can also initiate its own commands as well.
+
+   So, multiple threads can submit jobs, and write to the
+   stream (issue: locking stream for write?), but only this
+   thread can ever read from the stream.  This simplifies
+   locking, and greatly simplifies working out when new
+   work is ready.
+  
+   TODO:
+   This poll stuff wont work - we might block
+   waiting for results inside loops etc.
+
+   Requires a different approach:
+   +
+
+   New commands are queued in other threads as well
+   as this thread, and get pipelined over the socket.
+
+   Main area of locking required is command_queue
+   and command_start_next, the 'literal' command,
+   the jobs queue, the active queue, the queue
+   queue. */
 static gpointer
 imapx_parser_thread (gpointer d)
 {
@@ -3180,59 +3239,34 @@ imapx_parser_thread (gpointer d)
        CamelException ex = CAMEL_EXCEPTION_INITIALISER;
        CamelOperation *op;
 
-       /*
-         The main processing (reading) loop.
-
-         Incoming requests are added as jobs and tasks from other threads,
-         we just read the results from the server continously, and match
-         them up with the queued tasks as they come back.
-
-         Of course this loop can also initiate its own commands as well.
-
-         So, multiple threads can submit jobs, and write to the
-         stream (issue: locking stream for write?), but only this
-         thread can ever read from the stream.  This simplifies
-         locking, and greatly simplifies working out when new
-         work is ready.
-       */
-
-       e(printf("imapx server loop started\n"));
-
        op = camel_operation_new (NULL, NULL);
        op = camel_operation_register (op);
-       while (TRUE) {
 
+       while (TRUE) {
+               
                CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+               
+               g_static_rec_mutex_lock (&is->ostream_lock);
                if (!is->stream)
                        imapx_reconnect(is, &ex);
-               CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
+               g_static_rec_mutex_unlock (&is->ostream_lock);
 
-               /* TODO:
-                  This poll stuff wont work - we might block
-                  waiting for results inside loops etc.
-
-                  Requires a different approach:
-                  +
-
-                  New commands are queued in other threads as well
-                  as this thread, and get pipelined over the socket.
-
-                  Main area of locking required is command_queue
-                  and command_start_next, the 'literal' command,
-                  the jobs queue, the active queue, the queue
-                  queue. */
+               CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
 
-               /* if ssl stream ... */
 #ifdef HAVE_SSL
-               if (CAMEL_IS_TCP_STREAM_SSL (is->stream->source))
+               if (is->is_ssl_stream)
                {
                        PRPollDesc pollfds[2] = { };
                        gint res;
 
+                       CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+
                        pollfds[0].fd = camel_tcp_stream_ssl_sockfd ((CamelTcpStreamSSL *)is->stream->source);
                        pollfds[0].in_flags = PR_POLL_READ;
                        pollfds[1].fd = camel_operation_cancel_prfd (op);
                        pollfds[1].in_flags = PR_POLL_READ;
+
+                       CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
 #include <prio.h>
 
                        res = PR_Poll(pollfds, 2, PR_MillisecondsToInterval (30 * 1000));
@@ -3241,27 +3275,25 @@ imapx_parser_thread (gpointer d)
                        else if (res == 0) {
                                /* timed out */
                        } else if ((pollfds[0].out_flags & PR_POLL_READ)) {
-                               do {
-                                       /* This is quite shitty, it will often block on each
-                                          part of the decode, causing significant
-                                          processing delays. */
-                                       imapx_step(is, &ex);
-                               } while (camel_imapx_stream_buffered(is->stream) && !camel_exception_is_set (&ex));
-                       } else if (pollfds[1].out_flags & PR_POLL_READ) {
+                               parse_contents (is, &ex);
+                       } else if (pollfds[1].out_flags & PR_POLL_READ)
                                errno = EINTR;
-                       }
                }
 #endif
 
-               if (!CAMEL_IS_TCP_STREAM_SSL (is->stream->source))
+               if (!is->is_ssl_stream)
                {
                        struct pollfd fds[2] = { {0, 0, 0}, {0, 0, 0} };
                        gint res;
 
+                       CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+                       
                        fds[0].fd = ((CamelTcpStreamRaw *)is->stream->source)->sockfd;
                        fds[0].events = POLLIN;
                        fds[1].fd = camel_operation_cancel_fd (op);
                        fds[1].events = POLLIN;
+                       
+                       CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
 
                        res = poll(fds, 2, 1000*30);
                        if (res == -1)
@@ -3269,12 +3301,9 @@ imapx_parser_thread (gpointer d)
                        else if (res == 0)
                                /* timed out */;
                        else if (fds[0].revents & POLLIN) {
-                               do {
-                                       imapx_step(is, &ex);
-                               } while (camel_imapx_stream_buffered(is->stream) && !camel_exception_is_set (&ex));
-                       } else if (fds[1].revents & POLLIN) {
+                               parse_contents (is, &ex);
+                       } else if (fds[1].revents & POLLIN)
                                errno = EINTR;
-                       }
                }
 
                if (errno == EINTR)
@@ -3282,11 +3311,7 @@ imapx_parser_thread (gpointer d)
 
                if (camel_exception_is_set (&ex)) {
                        if (errno == EINTR || !g_ascii_strcasecmp (ex.desc, "io error")) {
-
-                               CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
                                imapx_disconnect (is);
-                               CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
-
                                cancel_all_jobs (is, &ex);
 
                                if (imapx_idle_supported (is))
@@ -3294,13 +3319,13 @@ imapx_parser_thread (gpointer d)
                        }
 
                        if (errno == EINTR)
-                               return NULL;
+                               goto quit;
 
                        camel_exception_clear (&ex);
-                       sleep(1);
                }
        }
 
+quit:
        if (op)
                camel_operation_unref (op);
 
@@ -3327,6 +3352,7 @@ imapx_server_init(CamelIMAPXServer *ie, CamelIMAPXServerClass *ieclass)
        ie->job_timeout = 29 * 60 * 1000 * 1000;
 
        ie->queue_lock = g_mutex_new();
+       g_static_rec_mutex_init (&ie->ostream_lock);
 
        ie->tagprefix = ieclass->tagprefix;
        ieclass->tagprefix++;
@@ -3344,6 +3370,7 @@ static void
 imapx_server_finalise(CamelIMAPXServer *ie, CamelIMAPXServerClass *ieclass)
 {
        g_mutex_free(ie->queue_lock);
+       g_static_rec_mutex_free (&ie->ostream_lock);
 
        camel_folder_change_info_free (ie->changes);
 }
@@ -3387,6 +3414,9 @@ imapx_disconnect (CamelIMAPXServer *is)
 {
        gboolean ret = TRUE;
 
+       CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+       g_static_rec_mutex_lock (&is->ostream_lock);
+
        if (is->stream) {
                if (camel_stream_close (is->stream->source) == -1)
                        ret = FALSE;
@@ -3416,6 +3446,9 @@ imapx_disconnect (CamelIMAPXServer *is)
                camel_imapx_command_free (is->literal);
                is->literal = NULL;
        }
+                               
+       g_static_rec_mutex_unlock (&is->ostream_lock);
+       CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
 
        return ret;
 }
@@ -3436,7 +3469,9 @@ camel_imapx_server_connect(CamelIMAPXServer *is, gint state)
                        goto exit;
                }
 
+               g_static_rec_mutex_lock (&is->ostream_lock);
                imapx_reconnect (is, &ex);
+               g_static_rec_mutex_unlock (&is->ostream_lock);
                if (camel_exception_is_set (&ex)) {
                        ret = FALSE;
                        goto exit;
index bd984e5..585018c 100644 (file)
@@ -51,6 +51,7 @@ struct _CamelIMAPXServer {
        struct _CamelURL *url;
        struct _CamelIMAPXStream *stream;
        struct _capability_info *cinfo;
+       gboolean is_ssl_stream;
 
        CamelIMAPXNamespaceList *nsl;
 
@@ -89,6 +90,7 @@ struct _CamelIMAPXServer {
        GSList *expunged;
 
        pthread_t parser_thread_id;
+       GStaticRecMutex ostream_lock;
 
        /* Idle */
        struct _CamelIMAPXIdle *idle;