From: Chenthill Palanisamy Date: Tue, 23 Feb 2010 00:56:31 +0000 (+0530) Subject: Add locks to input/output streams X-Git-Tag: upstream/3.7.4~3354 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=6b8c7302d50dbb71be2f678b62e6a38e7fadf552;p=platform%2Fupstream%2Fevolution-data-server.git Add locks to input/output streams --- diff --git a/camel/providers/imapx/camel-imapx-server.c b/camel/providers/imapx/camel-imapx-server.c index ad50202..3eab8e1 100644 --- a/camel/providers/imapx/camel-imapx-server.c +++ b/camel/providers/imapx/camel-imapx-server.c @@ -738,12 +738,8 @@ imapx_command_start (CamelIMAPXServer *imap, CamelIMAPXCommand *ic) gboolean ret = TRUE; camel_imapx_command_close(ic); - - /* FIXME: assert the selected folder == ic->selected */ - cp = (CamelIMAPXCommandPart *)ic->parts.head; g_assert(cp->next); - ic->current = cp; /* TODO: If we support literal+ we should be able to write the whole command out @@ -754,25 +750,22 @@ imapx_command_start (CamelIMAPXServer *imap, CamelIMAPXCommand *ic) camel_dlist_addtail(&imap->active, (CamelDListNode *)ic); + g_static_rec_mutex_lock (&imap->ostream_lock); + c(printf("Staring command (active=%d,%s) %c%05u %s\r\n", camel_dlist_length(&imap->active), imap->literal?" literal":"", imap->tagprefix, ic->tag, cp->data)); if (!imap->stream || camel_stream_printf((CamelStream *)imap->stream, "%c%05u %s\r\n", imap->tagprefix, ic->tag, cp->data) == -1) { + g_print ("Command start failed \n"); camel_exception_set (ic->ex, 1, "Command start failed"); ret = FALSE; - camel_dlist_remove((CamelDListNode *)ic); + camel_dlist_remove ((CamelDListNode *)ic); } - + + g_static_rec_mutex_unlock (&imap->ostream_lock); + return ret; } -/* must have QUEUE lock */ -static void -imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex) -{ - CamelIMAPXCommand *ic, *nc; - gint count = 0; - gint pri = -128; - - /* See if we can start another task yet. +/* See if we can start another task yet. If we're waiting for a literal, we cannot proceed. @@ -787,12 +780,17 @@ imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex) If we dont, select the first folder required, then queue all the outstanding jobs on it, that are at least as high priority as the first. + + must have QUEUE lock */ - This is very shitty code! - */ +static void +imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex) +{ + CamelIMAPXCommand *ic, *nc; + gint count = 0; + gint pri = -128; c(printf("** Starting next command\n")); - if (is->literal != NULL || is->select_pending != NULL) { c(printf("* no, waiting for literal/pending select '%s'\n", is->select_pending->full_name)); return; @@ -1138,7 +1136,7 @@ imapx_untagged(CamelIMAPXServer *imap, CamelException *ex) if (finfo->got & FETCH_FLAGS && !(finfo->got & FETCH_UID)) { if (imap->select_folder) { CamelFolder *folder; - CamelMessageInfo *mi; + CamelMessageInfo *mi = NULL; gboolean changed = FALSE; gchar *uid = NULL; @@ -1366,12 +1364,16 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex) printf("got continuation response\n"); + CAMEL_SERVICE_REC_LOCK (imap->store, connect_lock); /* The 'literal' pointer is like a write-lock, nothing else can write while we have it ... so we dont need any ohter lock here. All other writes go through queue-lock */ if (imapx_idle_supported (imap) && imapx_in_idle (imap)) { camel_imapx_stream_skip (imap->stream, ex); + + CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock); + printf("Got continuation response for IDLE \n"); imap->idle->started = TRUE; @@ -1386,6 +1388,7 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex) ic = imap->literal; if (ic == NULL) { camel_imapx_stream_skip(imap->stream, ex); + CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock); printf("got continuation response with no outstanding continuation requests?\n"); return 1; } @@ -1441,6 +1444,7 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex) /* should we just ignore? */ imap->literal = NULL; camel_exception_set (ex, 1, "continuation response for non-continuation request"); + CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock); return -1; } @@ -1460,6 +1464,8 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex) printf("%p: queueing continuation\n", ic); camel_stream_printf((CamelStream *)imap->stream, "\r\n"); } + + CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock); QUEUE_LOCK(imap); imap->literal = newliteral; @@ -1531,7 +1537,10 @@ imapx_completion(CamelIMAPXServer *imap, guchar *token, gint len, CamelException camel_dlist_remove ((CamelDListNode *) ic); QUEUE_UNLOCK(imap); + + CAMEL_SERVICE_REC_LOCK (imap->store, connect_lock); ic->status = imapx_parse_status(imap->stream, ex); + CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock); if (ic->complete) ic->complete (imap, ic); @@ -1551,7 +1560,9 @@ imapx_step(CamelIMAPXServer *is, CamelException *ex) gint tok; // poll ? wait for other stuff? loop? + CAMEL_SERVICE_REC_LOCK (is->store, connect_lock); tok = camel_imapx_stream_token (is->stream, &token, &len, ex); + CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock); if (camel_exception_is_set (ex)) return; @@ -1990,11 +2001,14 @@ imapx_connect (CamelIMAPXServer *is, gint ssl_mode, gint try_starttls, CamelExce } tcp_stream = camel_tcp_stream_ssl_new(is->session, is->url->host, SSL_PORT_FLAGS); } + is->is_ssl_stream = TRUE; } else { tcp_stream = camel_tcp_stream_raw_new (); + is->is_ssl_stream = FALSE; } #else tcp_stream = camel_tcp_stream_raw_new (); + is->is_ssl_stream = FALSE; #endif /* HAVE_SSL */ hints.ai_socktype = SOCK_STREAM; @@ -2428,7 +2442,6 @@ imapx_command_append_message_done (CamelIMAPXServer *is, CamelIMAPXCommand *ic) changes); camel_folder_change_info_free (changes); - camel_message_info_free(mi); g_free(cur); } else { printf("but uidvalidity changed, uh ...\n"); @@ -3173,6 +3186,52 @@ cancel_all_jobs (CamelIMAPXServer *is, CamelException *ex) /* ********************************************************************** */ +static void +parse_contents (CamelIMAPXServer *is, CamelException *ex) +{ + gint buffered = 0; + + do { + imapx_step(is, ex); + + CAMEL_SERVICE_REC_LOCK (is->store, connect_lock); + + buffered = camel_imapx_stream_buffered (is->stream); + + CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock); + + } while (buffered && !camel_exception_is_set (ex)); +} + +/* + The main processing (reading) loop. + + Incoming requests are added as jobs and tasks from other threads, + we just read the results from the server continously, and match + them up with the queued tasks as they come back. + + Of course this loop can also initiate its own commands as well. + + So, multiple threads can submit jobs, and write to the + stream (issue: locking stream for write?), but only this + thread can ever read from the stream. This simplifies + locking, and greatly simplifies working out when new + work is ready. + + TODO: + This poll stuff wont work - we might block + waiting for results inside loops etc. + + Requires a different approach: + + + + New commands are queued in other threads as well + as this thread, and get pipelined over the socket. + + Main area of locking required is command_queue + and command_start_next, the 'literal' command, + the jobs queue, the active queue, the queue + queue. */ static gpointer imapx_parser_thread (gpointer d) { @@ -3180,59 +3239,34 @@ imapx_parser_thread (gpointer d) CamelException ex = CAMEL_EXCEPTION_INITIALISER; CamelOperation *op; - /* - The main processing (reading) loop. - - Incoming requests are added as jobs and tasks from other threads, - we just read the results from the server continously, and match - them up with the queued tasks as they come back. - - Of course this loop can also initiate its own commands as well. - - So, multiple threads can submit jobs, and write to the - stream (issue: locking stream for write?), but only this - thread can ever read from the stream. This simplifies - locking, and greatly simplifies working out when new - work is ready. - */ - - e(printf("imapx server loop started\n")); - op = camel_operation_new (NULL, NULL); op = camel_operation_register (op); - while (TRUE) { + while (TRUE) { + CAMEL_SERVICE_REC_LOCK (is->store, connect_lock); + + g_static_rec_mutex_lock (&is->ostream_lock); if (!is->stream) imapx_reconnect(is, &ex); - CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock); + g_static_rec_mutex_unlock (&is->ostream_lock); - /* TODO: - This poll stuff wont work - we might block - waiting for results inside loops etc. - - Requires a different approach: - + - - New commands are queued in other threads as well - as this thread, and get pipelined over the socket. - - Main area of locking required is command_queue - and command_start_next, the 'literal' command, - the jobs queue, the active queue, the queue - queue. */ + CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock); - /* if ssl stream ... */ #ifdef HAVE_SSL - if (CAMEL_IS_TCP_STREAM_SSL (is->stream->source)) + if (is->is_ssl_stream) { PRPollDesc pollfds[2] = { }; gint res; + CAMEL_SERVICE_REC_LOCK (is->store, connect_lock); + pollfds[0].fd = camel_tcp_stream_ssl_sockfd ((CamelTcpStreamSSL *)is->stream->source); pollfds[0].in_flags = PR_POLL_READ; pollfds[1].fd = camel_operation_cancel_prfd (op); pollfds[1].in_flags = PR_POLL_READ; + + CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock); #include res = PR_Poll(pollfds, 2, PR_MillisecondsToInterval (30 * 1000)); @@ -3241,27 +3275,25 @@ imapx_parser_thread (gpointer d) else if (res == 0) { /* timed out */ } else if ((pollfds[0].out_flags & PR_POLL_READ)) { - do { - /* This is quite shitty, it will often block on each - part of the decode, causing significant - processing delays. */ - imapx_step(is, &ex); - } while (camel_imapx_stream_buffered(is->stream) && !camel_exception_is_set (&ex)); - } else if (pollfds[1].out_flags & PR_POLL_READ) { + parse_contents (is, &ex); + } else if (pollfds[1].out_flags & PR_POLL_READ) errno = EINTR; - } } #endif - if (!CAMEL_IS_TCP_STREAM_SSL (is->stream->source)) + if (!is->is_ssl_stream) { struct pollfd fds[2] = { {0, 0, 0}, {0, 0, 0} }; gint res; + CAMEL_SERVICE_REC_LOCK (is->store, connect_lock); + fds[0].fd = ((CamelTcpStreamRaw *)is->stream->source)->sockfd; fds[0].events = POLLIN; fds[1].fd = camel_operation_cancel_fd (op); fds[1].events = POLLIN; + + CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock); res = poll(fds, 2, 1000*30); if (res == -1) @@ -3269,12 +3301,9 @@ imapx_parser_thread (gpointer d) else if (res == 0) /* timed out */; else if (fds[0].revents & POLLIN) { - do { - imapx_step(is, &ex); - } while (camel_imapx_stream_buffered(is->stream) && !camel_exception_is_set (&ex)); - } else if (fds[1].revents & POLLIN) { + parse_contents (is, &ex); + } else if (fds[1].revents & POLLIN) errno = EINTR; - } } if (errno == EINTR) @@ -3282,11 +3311,7 @@ imapx_parser_thread (gpointer d) if (camel_exception_is_set (&ex)) { if (errno == EINTR || !g_ascii_strcasecmp (ex.desc, "io error")) { - - CAMEL_SERVICE_REC_LOCK (is->store, connect_lock); imapx_disconnect (is); - CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock); - cancel_all_jobs (is, &ex); if (imapx_idle_supported (is)) @@ -3294,13 +3319,13 @@ imapx_parser_thread (gpointer d) } if (errno == EINTR) - return NULL; + goto quit; camel_exception_clear (&ex); - sleep(1); } } +quit: if (op) camel_operation_unref (op); @@ -3327,6 +3352,7 @@ imapx_server_init(CamelIMAPXServer *ie, CamelIMAPXServerClass *ieclass) ie->job_timeout = 29 * 60 * 1000 * 1000; ie->queue_lock = g_mutex_new(); + g_static_rec_mutex_init (&ie->ostream_lock); ie->tagprefix = ieclass->tagprefix; ieclass->tagprefix++; @@ -3344,6 +3370,7 @@ static void imapx_server_finalise(CamelIMAPXServer *ie, CamelIMAPXServerClass *ieclass) { g_mutex_free(ie->queue_lock); + g_static_rec_mutex_free (&ie->ostream_lock); camel_folder_change_info_free (ie->changes); } @@ -3387,6 +3414,9 @@ imapx_disconnect (CamelIMAPXServer *is) { gboolean ret = TRUE; + CAMEL_SERVICE_REC_LOCK (is->store, connect_lock); + g_static_rec_mutex_lock (&is->ostream_lock); + if (is->stream) { if (camel_stream_close (is->stream->source) == -1) ret = FALSE; @@ -3416,6 +3446,9 @@ imapx_disconnect (CamelIMAPXServer *is) camel_imapx_command_free (is->literal); is->literal = NULL; } + + g_static_rec_mutex_unlock (&is->ostream_lock); + CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock); return ret; } @@ -3436,7 +3469,9 @@ camel_imapx_server_connect(CamelIMAPXServer *is, gint state) goto exit; } + g_static_rec_mutex_lock (&is->ostream_lock); imapx_reconnect (is, &ex); + g_static_rec_mutex_unlock (&is->ostream_lock); if (camel_exception_is_set (&ex)) { ret = FALSE; goto exit; diff --git a/camel/providers/imapx/camel-imapx-server.h b/camel/providers/imapx/camel-imapx-server.h index bd984e5..585018c 100644 --- a/camel/providers/imapx/camel-imapx-server.h +++ b/camel/providers/imapx/camel-imapx-server.h @@ -51,6 +51,7 @@ struct _CamelIMAPXServer { struct _CamelURL *url; struct _CamelIMAPXStream *stream; struct _capability_info *cinfo; + gboolean is_ssl_stream; CamelIMAPXNamespaceList *nsl; @@ -89,6 +90,7 @@ struct _CamelIMAPXServer { GSList *expunged; pthread_t parser_thread_id; + GStaticRecMutex ostream_lock; /* Idle */ struct _CamelIMAPXIdle *idle;