Fix FSF address (Tobias Mueller, #470445)
[platform/upstream/evolution-data-server.git] / libedataserver / e-msgport.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Authors: Michael Zucchi <notzed@ximian.com>
4  *
5  * Copyright 2002 Ximian, Inc. (www.ximian.com)
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of version 2 of the GNU Lesser General Public
9  * License as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  *
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <sys/time.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <stdio.h>
33
34 #include <pthread.h>
35
36 #include <glib.h>
37
38 #ifdef HAVE_NSS
39 #include <nspr.h>
40 #endif
41
42 #ifdef G_OS_WIN32
43 #include <winsock2.h>
44 #endif
45
46 #include "e-msgport.h"
47 #include "e-data-server-util.h"
48
49 #define m(x)                    /* msgport debug */
50 #define t(x)                    /* thread debug */
51 #define c(x)                    /* cache debug */
52
53 #ifdef G_OS_WIN32
54 #define E_CLOSE(socket) closesocket (socket)
55 #define E_READ(socket,buf,nbytes) recv(socket,buf,nbytes,0)
56 #define E_WRITE(socket,buf,nbytes) send(socket,buf,nbytes,0)
57 #define E_IS_SOCKET_ERROR(status) ((status) == SOCKET_ERROR)
58 #define E_IS_INVALID_SOCKET(socket) ((socket) == INVALID_SOCKET)
59 #define E_IS_STATUS_INTR() 0 /* No WSAEINTR errors in WinSock2  */
60 #else
61 #define E_CLOSE(socket) close (socket)
62 #define E_READ(socket,buf,nbytes) read(socket,buf,nbytes)
63 #define E_WRITE(socket,buf,nbytes) write(socket,buf,nbytes)
64 #define E_IS_SOCKET_ERROR(status) ((status) == -1)
65 #define E_IS_INVALID_SOCKET(socket) ((socket) < 0)
66 #define E_IS_STATUS_INTR() (errno == EINTR)
67 #endif
68
69 static int
70 e_pipe (int *fds)
71 {
72 #ifndef G_OS_WIN32
73         if (pipe (fds) != -1)
74                 return 0;
75         
76         fds[0] = -1;
77         fds[1] = -1;
78         
79         return -1;
80 #else
81         SOCKET temp, socket1 = -1, socket2 = -1;
82         struct sockaddr_in saddr;
83         int len;
84         u_long arg;
85         fd_set read_set, write_set;
86         struct timeval tv;
87
88         temp = socket (AF_INET, SOCK_STREAM, 0);
89         
90         if (temp == INVALID_SOCKET) {
91                 goto out0;
92         }
93         
94         arg = 1;
95         if (ioctlsocket (temp, FIONBIO, &arg) == SOCKET_ERROR) {
96                 goto out0;
97         }
98
99         memset (&saddr, 0, sizeof (saddr));
100         saddr.sin_family = AF_INET;
101         saddr.sin_port = 0;
102         saddr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
103
104         if (bind (temp, (struct sockaddr *)&saddr, sizeof (saddr))) {
105                 goto out0;
106         }
107
108         if (listen (temp, 1) == SOCKET_ERROR) {
109                 goto out0;
110         }
111
112         len = sizeof (saddr);
113         if (getsockname (temp, (struct sockaddr *)&saddr, &len)) {
114                 goto out0;
115         }
116
117         socket1 = socket (AF_INET, SOCK_STREAM, 0);
118         
119         if (socket1 == INVALID_SOCKET) {
120                 goto out0;
121         }
122
123         arg = 1;
124         if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR) { 
125                 goto out1;
126         }
127
128         if (connect (socket1, (struct sockaddr  *)&saddr, len) != SOCKET_ERROR ||
129             WSAGetLastError () != WSAEWOULDBLOCK) {
130                 goto out1;
131         }
132
133         FD_ZERO (&read_set);
134         FD_SET (temp, &read_set);
135
136         tv.tv_sec = 0;
137         tv.tv_usec = 0;
138
139         if (select (0, &read_set, NULL, NULL, NULL) == SOCKET_ERROR) {
140                 goto out1;
141         }
142
143         if (!FD_ISSET (temp, &read_set)) {
144                 goto out1;
145         }
146
147         socket2 = accept (temp, (struct sockaddr *) &saddr, &len);
148         if (socket2 == INVALID_SOCKET) {
149                 goto out1;
150         }
151
152         FD_ZERO (&write_set);
153         FD_SET (socket1, &write_set);
154
155         tv.tv_sec = 0;
156         tv.tv_usec = 0;
157
158         if (select (0, NULL, &write_set, NULL, NULL) == SOCKET_ERROR) {
159                 goto out2;
160         }
161
162         if (!FD_ISSET (socket1, &write_set)) {
163                 goto out2;
164         }
165
166         arg = 0;
167         if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR) {
168                 goto out2;
169         }
170
171         arg = 0;
172         if (ioctlsocket (socket2, FIONBIO, &arg) == SOCKET_ERROR) {
173                 goto out2;
174         }
175
176         fds[0] = socket1;
177         fds[1] = socket2;
178
179         closesocket (temp);
180
181         return 0;
182
183 out2:
184         closesocket (socket2);
185 out1:
186         closesocket (socket1);
187 out0:
188         closesocket (temp);
189         errno = EMFILE;         /* FIXME: use the real syscall errno? */
190         
191         fds[0] = -1;
192         fds[1] = -1;
193         
194         return -1;
195
196 #endif
197 }
198
199 void e_dlist_init(EDList *v)
200 {
201         v->head = (EDListNode *)&v->tail;
202         v->tail = 0;
203         v->tailpred = (EDListNode *)&v->head;
204 }
205
206 EDListNode *e_dlist_addhead(EDList *l, EDListNode *n)
207 {
208         n->next = l->head;
209         n->prev = (EDListNode *)&l->head;
210         l->head->prev = n;
211         l->head = n;
212         return n;
213 }
214
215 EDListNode *e_dlist_addtail(EDList *l, EDListNode *n)
216 {
217         n->next = (EDListNode *)&l->tail;
218         n->prev = l->tailpred;
219         l->tailpred->next = n;
220         l->tailpred = n;
221         return n;
222 }
223
224 EDListNode *e_dlist_remove(EDListNode *n)
225 {
226         n->next->prev = n->prev;
227         n->prev->next = n->next;
228         return n;
229 }
230
231 EDListNode *e_dlist_remhead(EDList *l)
232 {
233         EDListNode *n, *nn;
234
235         n = l->head;
236         nn = n->next;
237         if (nn) {
238                 nn->prev = n->prev;
239                 l->head = nn;
240                 return n;
241         }
242         return NULL;
243 }
244
245 EDListNode *e_dlist_remtail(EDList *l)
246 {
247         EDListNode *n, *np;
248
249         n = l->tailpred;
250         np = n->prev;
251         if (np) {
252                 np->next = n->next;
253                 l->tailpred = np;
254                 return n;
255         }
256         return NULL;
257 }
258
259 int e_dlist_empty(EDList *l)
260 {
261         return (l->head == (EDListNode *)&l->tail);
262 }
263
264 int e_dlist_length(EDList *l)
265 {
266         EDListNode *n, *nn;
267         int count = 0;
268
269         n = l->head;
270         nn = n->next;
271         while (nn) {
272                 count++;
273                 n = nn;
274                 nn = n->next;
275         }
276
277         return count;
278 }
279
280 struct _EMCache {
281         GMutex *lock;
282         GHashTable *key_table;
283         EDList lru_list;
284         size_t node_size;
285         int node_count;
286         time_t timeout;
287         GFreeFunc node_free;
288 };
289
290 /**
291  * em_cache_new:
292  * @timeout: 
293  * @nodesize: 
294  * @nodefree: 
295  * 
296  * Setup a new timeout cache.  @nodesize is the size of nodes in the
297  * cache, and @nodefree will be called to free YOUR content.
298  * 
299  * Return value: 
300  **/
301 EMCache *
302 em_cache_new(time_t timeout, size_t nodesize, GFreeFunc nodefree)
303 {
304         struct _EMCache *emc;
305
306         emc = g_malloc0(sizeof(*emc));
307         emc->node_size = nodesize;
308         emc->key_table = g_hash_table_new(g_str_hash, g_str_equal);
309         emc->node_free = nodefree;
310         e_dlist_init(&emc->lru_list);
311         emc->lock = g_mutex_new();
312         emc->timeout = timeout;
313
314         return emc;
315 }
316
317 /**
318  * em_cache_destroy:
319  * @emc: 
320  * 
321  * destroy the cache, duh.
322  **/
323 void
324 em_cache_destroy(EMCache *emc)
325 {
326         em_cache_clear(emc);
327         g_mutex_free(emc->lock);
328         g_free(emc);
329 }
330
331 /**
332  * em_cache_lookup:
333  * @emc: 
334  * @key: 
335  * 
336  * Lookup a cache node.  once you're finished with it, you need to
337  * unref it.
338  * 
339  * Return value: 
340  **/
341 EMCacheNode *
342 em_cache_lookup(EMCache *emc, const char *key)
343 {
344         EMCacheNode *n;
345
346         g_mutex_lock(emc->lock);
347         n = g_hash_table_lookup(emc->key_table, key);
348         if (n) {
349                 e_dlist_remove((EDListNode *)n);
350                 e_dlist_addhead(&emc->lru_list, (EDListNode *)n);
351                 n->stamp = time(0);
352                 n->ref_count++;
353         }
354         g_mutex_unlock(emc->lock);
355
356         c(printf("looking up '%s' %s\n", key, n?"found":"not found"));
357
358         return n;
359 }
360
361 /**
362  * em_cache_node_new:
363  * @emc: 
364  * @key: 
365  * 
366  * Create a new key'd cache node.  The node will not be added to the
367  * cache until you insert it.
368  * 
369  * Return value: 
370  **/
371 EMCacheNode *
372 em_cache_node_new(EMCache *emc, const char *key)
373 {
374         EMCacheNode *n;
375
376         /* this could use memchunks, but its probably overkill */
377         n = g_malloc0(emc->node_size);
378         n->key = g_strdup(key);
379
380         return n;
381 }
382
383 /**
384  * em_cache_node_unref:
385  * @emc: 
386  * @n: 
387  * 
388  * unref a cache node, you can only unref nodes which have been looked
389  * up.
390  **/
391 void
392 em_cache_node_unref(EMCache *emc, EMCacheNode *n)
393 {
394         g_mutex_lock(emc->lock);
395         g_assert(n->ref_count > 0);
396         n->ref_count--;
397         g_mutex_unlock(emc->lock);
398 }
399
400 /**
401  * em_cache_add:
402  * @emc: 
403  * @n: 
404  * 
405  * Add a cache node to the cache, once added the memory is owned by
406  * the cache.  If there are conflicts and the old node is still in
407  * use, then the new node is not added, otherwise it is added and any
408  * nodes older than the expire time are flushed.
409  **/
410 void
411 em_cache_add(EMCache *emc, EMCacheNode *n)
412 {
413         EMCacheNode *old, *prev;
414         EDList old_nodes;
415
416         e_dlist_init(&old_nodes);
417
418         g_mutex_lock(emc->lock);
419         old = g_hash_table_lookup(emc->key_table, n->key);
420         if (old != NULL) {
421                 if (old->ref_count == 0) {
422                         g_hash_table_remove(emc->key_table, old->key);
423                         e_dlist_remove((EDListNode *)old);
424                         e_dlist_addtail(&old_nodes, (EDListNode *)old);
425                         goto insert;
426                 } else {
427                         e_dlist_addtail(&old_nodes, (EDListNode *)n);
428                 }
429         } else {
430                 time_t now;
431         insert:
432                 now = time(0);
433                 g_hash_table_insert(emc->key_table, n->key, n);
434                 e_dlist_addhead(&emc->lru_list, (EDListNode *)n);
435                 n->stamp = now;
436                 emc->node_count++;
437
438                 c(printf("inserting node %s\n", n->key));
439
440                 old = (EMCacheNode *)emc->lru_list.tailpred;
441                 prev = old->prev;
442                 while (prev && old->stamp < now - emc->timeout) {
443                         if (old->ref_count == 0) {
444                                 c(printf("expiring node %s\n", old->key));
445                                 g_hash_table_remove(emc->key_table, old->key);
446                                 e_dlist_remove((EDListNode *)old);
447                                 e_dlist_addtail(&old_nodes, (EDListNode *)old);
448                         }
449                         old = prev;
450                         prev = prev->prev;
451                 }
452         }
453
454         g_mutex_unlock(emc->lock);
455
456         while ((old = (EMCacheNode *)e_dlist_remhead(&old_nodes))) {
457                 emc->node_free(old);
458                 g_free(old->key);
459                 g_free(old);
460         }
461 }
462
463 /**
464  * em_cache_clear:
465  * @emc: 
466  * 
467  * clear the cache.  just for api completeness.
468  **/
469 void
470 em_cache_clear(EMCache *emc)
471 {
472         EMCacheNode *node;
473         EDList old_nodes;
474
475         e_dlist_init(&old_nodes);
476         g_mutex_lock(emc->lock);
477         while ((node = (EMCacheNode *)e_dlist_remhead(&emc->lru_list)))
478                 e_dlist_addtail(&old_nodes, (EDListNode *)node);
479         g_mutex_unlock(emc->lock);
480
481         while ((node = (EMCacheNode *)e_dlist_remhead(&old_nodes))) {
482                 emc->node_free(node);
483                 g_free(node->key);
484                 g_free(node);
485         }
486 }
487
488 struct _EMsgPort {
489         GAsyncQueue *queue;
490         gint pipe[2];  /* on Win32, actually a pair of SOCKETs */
491 #ifdef HAVE_NSS
492         PRFileDesc *prpipe[2];
493 #endif
494 };
495
496 /* message flags */
497 enum {
498         MSG_FLAG_SYNC_WITH_PIPE    = 1 << 0,
499         MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1
500 };
501
502 #ifdef HAVE_NSS
503 static int
504 e_prpipe (PRFileDesc **fds)
505 {
506 #ifdef G_OS_WIN32
507         if (PR_NewTCPSocketPair (fds) != PR_FAILURE)
508                 return 0;
509 #else
510         if (PR_CreatePipe (&fds[0], &fds[1]) != PR_FAILURE)
511                 return 0;
512 #endif
513         
514         fds[0] = NULL;
515         fds[1] = NULL;
516         
517         return -1;
518 }
519 #endif
520
521 static void
522 msgport_sync_with_pipe (gint fd)
523 {
524         gchar buffer[1];
525
526         while (fd >= 0) {
527                 if (E_READ (fd, buffer, 1) > 0)
528                         break;
529                 else if (!E_IS_STATUS_INTR ()) {
530                         g_warning ("%s: Failed to read from pipe: %s",
531                                 G_STRFUNC, g_strerror (errno));
532                         break;
533                 }
534         }
535 }
536
537 #ifdef HAVE_NSS
538 static void
539 msgport_sync_with_prpipe (PRFileDesc *prfd)
540 {
541         gchar buffer[1];
542
543         while (prfd != NULL) {
544                 if (PR_Read (prfd, buffer, 1) > 0)
545                         break;
546                 else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
547                         gchar *text = g_alloca (PR_GetErrorTextLength ());
548                         PR_GetErrorText (text);
549                         g_warning ("%s: Failed to read from NSPR pipe: %s",
550                                 G_STRFUNC, text);
551                         break;
552                 }
553         }
554 }
555 #endif
556
557 EMsgPort *
558 e_msgport_new (void)
559 {
560         EMsgPort *msgport;
561
562         msgport = g_slice_new (EMsgPort);
563         msgport->queue = g_async_queue_new ();
564         msgport->pipe[0] = -1;
565         msgport->pipe[1] = -1;
566 #ifdef HAVE_NSS
567         msgport->prpipe[0] = NULL;
568         msgport->prpipe[1] = NULL;
569 #endif
570
571         return msgport;
572 }
573
574 void
575 e_msgport_destroy (EMsgPort *msgport)
576 {
577         g_return_if_fail (msgport != NULL);
578
579         if (msgport->pipe[0] >= 0) {
580                 E_CLOSE (msgport->pipe[0]);
581                 E_CLOSE (msgport->pipe[1]);
582         }
583 #ifdef HAVE_NSS
584         if (msgport->prpipe[0] != NULL) {
585                 PR_Close (msgport->prpipe[0]);
586                 PR_Close (msgport->prpipe[1]);
587         }
588 #endif
589
590         g_async_queue_unref (msgport->queue);
591         g_slice_free (EMsgPort, msgport);
592 }
593
594 int
595 e_msgport_fd (EMsgPort *msgport)
596 {
597         gint fd;
598
599         g_return_val_if_fail (msgport != NULL, -1);
600
601         g_async_queue_lock (msgport->queue);
602         fd = msgport->pipe[0];
603         if (fd < 0 && e_pipe (msgport->pipe) == 0)
604                 fd = msgport->pipe[0];
605         g_async_queue_unlock (msgport->queue);
606
607         return fd;
608 }
609
610 #ifdef HAVE_NSS
611 PRFileDesc *
612 e_msgport_prfd (EMsgPort *msgport)
613 {
614         PRFileDesc *prfd;
615
616         g_return_val_if_fail (msgport != NULL, NULL);
617
618         g_async_queue_lock (msgport->queue);
619         prfd = msgport->prpipe[0];
620         if (prfd == NULL && e_prpipe (msgport->prpipe) == 0)
621                 prfd = msgport->prpipe[0];
622         g_async_queue_unlock (msgport->queue);
623
624         return prfd;
625 }
626 #endif
627
628 void
629 e_msgport_put (EMsgPort *msgport, EMsg *msg)
630 {
631         gint fd;
632 #ifdef HAVE_NSS
633         PRFileDesc *prfd;
634 #endif
635
636         g_return_if_fail (msgport != NULL);
637         g_return_if_fail (msg != NULL);
638
639         g_async_queue_lock (msgport->queue);
640
641         msg->flags = 0;
642
643         fd = msgport->pipe[1];
644         while (fd >= 0) {
645                 if (E_WRITE (fd, "E", 1) > 0) {
646                         msg->flags |= MSG_FLAG_SYNC_WITH_PIPE;
647                         break;
648                 } else if (!E_IS_STATUS_INTR ()) {
649                         g_warning ("%s: Failed to write to pipe: %s",
650                                 G_STRFUNC, g_strerror (errno));
651                         break;
652                 }
653         }
654
655 #ifdef HAVE_NSS
656         prfd = msgport->prpipe[1];
657         while (prfd != NULL) {
658                 if (PR_Write (prfd, "E", 1) > 0) {
659                         msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE;
660                         break;
661                 } else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
662                         gchar *text = g_alloca (PR_GetErrorTextLength ());
663                         PR_GetErrorText (text);
664                         g_warning ("%s: Failed to write to NSPR pipe: %s",
665                                 G_STRFUNC, text);
666                         break;
667                 }
668         }
669 #endif
670
671         g_async_queue_push_unlocked (msgport->queue, msg);
672         g_async_queue_unlock (msgport->queue);
673 }
674
675 EMsg *
676 e_msgport_wait (EMsgPort *msgport)
677 {
678         EMsg *msg;
679
680         g_return_val_if_fail (msgport != NULL, NULL);
681
682         g_async_queue_lock (msgport->queue);
683
684         msg = g_async_queue_pop_unlocked (msgport->queue);
685
686         g_assert (msg != NULL);
687
688         if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
689                 msgport_sync_with_pipe (msgport->pipe[0]);
690 #ifdef HAVE_NSS
691         if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
692                 msgport_sync_with_prpipe (msgport->prpipe[0]);
693 #endif
694
695         g_async_queue_unlock (msgport->queue);
696
697         return msg;
698 }
699
700 EMsg *
701 e_msgport_get (EMsgPort *msgport)
702 {
703         EMsg *msg;
704
705         g_return_val_if_fail (msgport != NULL, NULL);
706
707         g_async_queue_lock (msgport->queue);
708
709         msg = g_async_queue_try_pop_unlocked (msgport->queue);
710
711         if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
712                 msgport_sync_with_pipe (msgport->pipe[0]);
713 #ifdef HAVE_NSS
714         if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
715                 msgport_sync_with_prpipe (msgport->prpipe[0]);
716 #endif
717
718         g_async_queue_unlock (msgport->queue);
719
720         return msg;
721 }
722
723 void
724 e_msgport_reply (EMsg *msg)
725 {
726         g_return_if_fail (msg != NULL);
727
728         if (msg->reply_port)
729                 e_msgport_put (msg->reply_port, msg);
730
731         /* else lost? */
732 }
733
734 #ifndef EDS_DISABLE_DEPRECATED
735
736 struct _thread_info {
737         pthread_t id;
738         int busy;
739 };
740
741 struct _EThread {
742         struct _EThread *next;
743         struct _EThread *prev;
744
745         EMsgPort *server_port;
746         EMsgPort *reply_port;
747         pthread_mutex_t mutex;
748         e_thread_t type;
749         int queue_limit;
750
751         int waiting;            /* if we are waiting for a new message, count of waiting processes */
752         pthread_t id;           /* our running child thread */
753         int have_thread;
754         GList *id_list;         /* if THREAD_NEW, then a list of our child threads in thread_info structs */
755
756         EThreadFunc destroy;
757         void *destroy_data;
758
759         EThreadFunc received;
760         void *received_data;
761
762         EThreadFunc lost;
763         void *lost_data;
764 };
765
766 /* All active threads */
767 static EDList ethread_list = E_DLIST_INITIALISER(ethread_list);
768 static pthread_mutex_t ethread_lock = PTHREAD_MUTEX_INITIALIZER;
769
770 #define E_THREAD_QUIT_REPLYPORT ((struct _EMsgPort *)~0)
771
772 static void thread_destroy_msg(EThread *e, EMsg *m);
773
774 static struct _thread_info *thread_find(EThread *e, pthread_t id)
775 {
776         GList *node;
777         struct _thread_info *info;
778
779         node = e->id_list;
780         while (node) {
781                 info = node->data;
782                 if (pthread_equal (info->id, id))
783                         return info;
784                 node = node->next;
785         }
786         return NULL;
787 }
788
789 #if 0
790 static void thread_remove(EThread *e, pthread_t id)
791 {
792         GList *node;
793         struct _thread_info *info;
794
795         node = e->id_list;
796         while (node) {
797                 info = node->data;
798                 if (pthread_equal (info->id, id)) {
799                         e->id_list = g_list_remove(e->id_list, info);
800                         g_free(info);
801                 }
802                 node = node->next;
803         }
804 }
805 #endif
806
807 EThread *e_thread_new(e_thread_t type)
808 {
809         EThread *e;
810
811         e = g_malloc0(sizeof(*e));
812         pthread_mutex_init(&e->mutex, 0);
813         e->type = type;
814         e->server_port = e_msgport_new();
815         e->have_thread = FALSE;
816         e->queue_limit = INT_MAX;
817
818         pthread_mutex_lock(&ethread_lock);
819         e_dlist_addtail(&ethread_list, (EDListNode *)e);
820         pthread_mutex_unlock(&ethread_lock);
821
822         return e;
823 }
824
825 /* close down the threads & resources etc */
826 void e_thread_destroy(EThread *e)
827 {
828         int busy = FALSE;
829         EMsg *msg;
830         struct _thread_info *info;
831         GList *l;
832
833         /* make sure we soak up all the messages first */
834         while ( (msg = e_msgport_get(e->server_port)) ) {
835                 thread_destroy_msg(e, msg);
836         }
837
838         pthread_mutex_lock(&e->mutex);
839
840         switch(e->type) {
841         case E_THREAD_QUEUE:
842         case E_THREAD_DROP:
843                 /* if we have a thread, 'kill' it */
844                 if (e->have_thread) {
845                         pthread_t id = e->id;
846                         t(printf("Sending thread '%" G_GUINT64_FORMAT "' quit message\n", e_util_pthread_id(id)));
847
848                         e->have_thread = FALSE;
849
850                         msg = g_malloc0(sizeof(*msg));
851                         msg->reply_port = E_THREAD_QUIT_REPLYPORT;
852                         e_msgport_put(e->server_port, msg);
853
854                         pthread_mutex_unlock(&e->mutex);
855                         t(printf("Joining thread '%" G_GUINT64_FORMAT "'\n", e_util_pthread_id(id)));
856                         pthread_join(id, 0);
857                         t(printf("Joined thread '%" G_GUINT64_FORMAT "'!\n", e_util_pthread_id(id)));
858                         pthread_mutex_lock(&e->mutex);
859                 }
860                 busy = e->have_thread;
861                 break;
862         case E_THREAD_NEW:
863                 /* first, send everyone a quit message */
864                 l = e->id_list;
865                 while (l) {
866                         info = l->data;
867                         t(printf("Sending thread '%" G_GUINT64_FORMAT "' quit message\n", e_util_pthread_id(info->id)));
868                         msg = g_malloc0(sizeof(*msg));
869                         msg->reply_port = E_THREAD_QUIT_REPLYPORT;
870                         e_msgport_put(e->server_port, msg);
871                         l = l->next;                    
872                 }
873
874                 /* then, wait for everyone to quit */
875                 while (e->id_list) {
876                         info = e->id_list->data;
877                         e->id_list = g_list_remove(e->id_list, info);
878                         pthread_mutex_unlock(&e->mutex);
879                         t(printf("Joining thread '%" G_GUINT64_FORMAT "'\n", e_util_pthread_id(info->id)));
880                         pthread_join(info->id, 0);
881                         t(printf("Joined thread '%" G_GUINT64_FORMAT "'!\n", e_util_pthread_id(info->id)));
882                         pthread_mutex_lock(&e->mutex);
883                         g_free(info);
884                 }
885                 busy = g_list_length(e->id_list) != 0;
886                 break;
887         }
888
889         pthread_mutex_unlock(&e->mutex);
890
891         /* and clean up, if we can */
892         if (busy) {
893                 g_warning("threads were busy, leaked EThread");
894                 return;
895         }
896
897         pthread_mutex_lock(&ethread_lock);
898         e_dlist_remove((EDListNode *)e);
899         pthread_mutex_unlock(&ethread_lock);
900
901         pthread_mutex_destroy(&e->mutex);
902         e_msgport_destroy(e->server_port);
903         g_free(e);
904 }
905
906 /* set the queue maximum depth, what happens when the queue
907    fills up depends on the queue type */
908 void e_thread_set_queue_limit(EThread *e, int limit)
909 {
910         e->queue_limit = limit;
911 }
912
913 /* set a msg destroy callback, this can not call any e_thread functions on @e */
914 void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data)
915 {
916         pthread_mutex_lock(&e->mutex);
917         e->destroy = destroy;
918         e->destroy_data = data;
919         pthread_mutex_unlock(&e->mutex);
920 }
921
922 /* set a message lost callback, called if any message is discarded */
923 void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data)
924 {
925         pthread_mutex_lock(&e->mutex);
926         e->lost = lost;
927         e->lost_data = lost;
928         pthread_mutex_unlock(&e->mutex);
929 }
930
931 /* set a reply port, if set, then send messages back once finished */
932 void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port)
933 {
934         e->reply_port = reply_port;
935 }
936
937 /* set a received data callback */
938 void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data)
939 {
940         pthread_mutex_lock(&e->mutex);
941         e->received = received;
942         e->received_data = data;
943         pthread_mutex_unlock(&e->mutex);
944 }
945
946 /* find out if we're busy doing any work, e==NULL, check for all work */
947 int e_thread_busy(EThread *e)
948 {
949         int busy = FALSE;
950
951         if (e == NULL) {
952                 pthread_mutex_lock(&ethread_lock);
953                 e = (EThread *)ethread_list.head;
954                 while (e->next && !busy) {
955                         busy = e_thread_busy(e);
956                         e = e->next;
957                 }
958                 pthread_mutex_unlock(&ethread_lock);
959         } else {
960                 pthread_mutex_lock(&e->mutex);
961                 switch (e->type) {
962                 case E_THREAD_QUEUE:
963                 case E_THREAD_DROP:
964                         busy = e->waiting != 1 && e->have_thread;
965                         break;
966                 case E_THREAD_NEW:
967                         busy = e->waiting != g_list_length(e->id_list);
968                         break;
969                 }
970                 pthread_mutex_unlock(&e->mutex);
971         }
972
973         return busy;
974 }
975
976 static void
977 thread_destroy_msg(EThread *e, EMsg *m)
978 {
979         EThreadFunc func;
980         void *func_data;
981
982         /* we do this so we never get an incomplete/unmatched callback + data */
983         pthread_mutex_lock(&e->mutex);
984         func = e->destroy;
985         func_data = e->destroy_data;
986         pthread_mutex_unlock(&e->mutex);
987         
988         if (func)
989                 func(e, m, func_data);
990 }
991
992 static void
993 thread_received_msg(EThread *e, EMsg *m)
994 {
995         EThreadFunc func;
996         void *func_data;
997
998         /* we do this so we never get an incomplete/unmatched callback + data */
999         pthread_mutex_lock(&e->mutex);
1000         func = e->received;
1001         func_data = e->received_data;
1002         pthread_mutex_unlock(&e->mutex);
1003         
1004         if (func)
1005                 func(e, m, func_data);
1006         else
1007                 g_warning("No processing callback for EThread, message unprocessed");
1008 }
1009
1010 static void
1011 thread_lost_msg(EThread *e, EMsg *m)
1012 {
1013         EThreadFunc func;
1014         void *func_data;
1015
1016         /* we do this so we never get an incomplete/unmatched callback + data */
1017         pthread_mutex_lock(&e->mutex);
1018         func = e->lost;
1019         func_data = e->lost_data;
1020         pthread_mutex_unlock(&e->mutex);
1021         
1022         if (func)
1023                 func(e, m, func_data);
1024 }
1025
1026 /* the actual thread dispatcher */
1027 static void *
1028 thread_dispatch(void *din)
1029 {
1030         EThread *e = din;
1031         EMsg *m;
1032         struct _thread_info *info;
1033         pthread_t self = pthread_self();
1034
1035         t(printf("dispatch thread started: %" G_GUINT64_FORMAT "\n", e_util_pthread_id(self)));
1036
1037         while (1) {
1038                 pthread_mutex_lock(&e->mutex);
1039                 m = e_msgport_get(e->server_port);
1040                 if (m == NULL) {
1041                         /* nothing to do?  If we are a 'new' type thread, just quit.
1042                            Otherwise, go into waiting (can be cancelled here) */
1043                         info = thread_find(e, self);
1044                         if (info)
1045                                 info->busy = FALSE;
1046                         e->waiting++;
1047                         pthread_mutex_unlock(&e->mutex);
1048                         m = e_msgport_wait(e->server_port);
1049                         pthread_mutex_lock(&e->mutex);
1050                         e->waiting--;
1051                 }
1052
1053                 if (m->reply_port == E_THREAD_QUIT_REPLYPORT) {
1054                         t(printf("Thread %" G_GUINT64_FORMAT " got quit message\n", e_util_pthread_id(self)));
1055                         /* Handle a quit message, say we're quitting, free the message, and break out of the loop */
1056                         info = thread_find(e, self);
1057                         if (info)
1058                                 info->busy = 2;
1059                         pthread_mutex_unlock(&e->mutex);
1060                         g_free(m);
1061                         break;
1062                 } else {
1063                         info = thread_find(e, self);
1064                         if (info)
1065                                 info->busy = TRUE;
1066                 }
1067                 pthread_mutex_unlock(&e->mutex);
1068
1069                 t(printf("got message in dispatch thread\n"));
1070
1071                 /* process it */
1072                 thread_received_msg(e, m);
1073
1074                 /* if we have a reply port, send it back, otherwise, lose it */
1075                 if (m->reply_port) {
1076                         e_msgport_reply(m);
1077                 } else {
1078                         thread_destroy_msg(e, m);
1079                 }
1080         }
1081
1082         return NULL;
1083 }
1084
1085 /* send a message to the thread, start thread if necessary */
1086 void e_thread_put(EThread *e, EMsg *msg)
1087 {
1088         pthread_t id;
1089         EMsg *dmsg = NULL;
1090
1091         pthread_mutex_lock(&e->mutex);
1092
1093         /* the caller forgot to tell us what to do, well, we can't do anything can we */
1094         if (e->received == NULL) {
1095                 pthread_mutex_unlock(&e->mutex);
1096                 g_warning("EThread called with no receiver function, no work to do!");
1097                 thread_destroy_msg(e, msg);
1098                 return;
1099         }
1100
1101         msg->reply_port = e->reply_port;
1102
1103         switch(e->type) {
1104         case E_THREAD_QUEUE:
1105                 /* if the queue is full, lose this new addition */
1106                 if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
1107                         e_msgport_put(e->server_port, msg);
1108                 } else {
1109                         printf("queue limit reached, dropping new message\n");
1110                         dmsg = msg;
1111                 }
1112                 break;
1113         case E_THREAD_DROP:
1114                 /* if the queue is full, lose the oldest (unprocessed) message */
1115                 if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
1116                         e_msgport_put(e->server_port, msg);
1117                 } else {
1118                         printf("queue limit reached, dropping old message\n");
1119                         e_msgport_put(e->server_port, msg);
1120                         dmsg = e_msgport_get(e->server_port);
1121                 }
1122                 break;
1123         case E_THREAD_NEW:
1124                 /* it is possible that an existing thread can catch this message, so
1125                    we might create a thread with no work to do.
1126                    but that doesn't matter, the other alternative that it be lost is worse */
1127                 e_msgport_put(e->server_port, msg);
1128                 if (e->waiting == 0
1129                     && g_list_length(e->id_list) < e->queue_limit
1130                     && pthread_create(&id, NULL, thread_dispatch, e) == 0) {
1131                         struct _thread_info *info = g_malloc0(sizeof(*info));
1132                         t(printf("created NEW thread %" G_GUINT64_FORMAT "\n", e_util_pthread_id(id)));
1133                         info->id = id;
1134                         info->busy = TRUE;
1135                         e->id_list = g_list_append(e->id_list, info);
1136                 }
1137                 pthread_mutex_unlock(&e->mutex);
1138                 return;
1139         }
1140
1141         /* create the thread, if there is none to receive it yet */
1142         if (!e->have_thread) {
1143                 int err;
1144
1145                 if ((err = pthread_create(&e->id, NULL, thread_dispatch, e)) != 0) {
1146                         g_warning("Could not create dispatcher thread, message queued?: %s", strerror(err));
1147                 } else {
1148                         e->have_thread = TRUE;
1149                 }
1150         }
1151
1152         pthread_mutex_unlock(&e->mutex);
1153
1154         if (dmsg) {
1155                 thread_lost_msg(e, dmsg);
1156                 thread_destroy_msg(e, dmsg);
1157         }
1158 }
1159 #endif     /* EDS_DISABLE_DEPRECATED */
1160
1161 /* yet-another-mutex interface */
1162 struct _EMutex {
1163         int type;
1164         pthread_t owner;
1165         int have_owner;
1166         short waiters;
1167         short depth;
1168         pthread_mutex_t mutex;
1169         pthread_cond_t cond;
1170 };
1171
1172 /* sigh, this is just painful to have to need, but recursive
1173    read/write, etc mutexes just aren't very common in thread
1174    implementations */
1175 /* TODO: Just make it use recursive mutexes if they are available */
1176 EMutex *e_mutex_new(e_mutex_t type)
1177 {
1178         struct _EMutex *m;
1179
1180         m = g_malloc(sizeof(*m));
1181         m->type = type;
1182         m->waiters = 0;
1183         m->depth = 0;
1184         m->have_owner = FALSE;
1185
1186         switch (type) {
1187         case E_MUTEX_SIMPLE:
1188                 pthread_mutex_init(&m->mutex, 0);
1189                 break;
1190         case E_MUTEX_REC:
1191                 pthread_mutex_init(&m->mutex, 0);
1192                 pthread_cond_init(&m->cond, 0);
1193                 break;
1194                 /* read / write ?  flags for same? */
1195         }
1196
1197         return m;
1198 }
1199
1200 int e_mutex_destroy(EMutex *m)
1201 {
1202         int ret = 0;
1203
1204         switch (m->type) {
1205         case E_MUTEX_SIMPLE:
1206                 ret = pthread_mutex_destroy(&m->mutex);
1207                 if (ret == -1)
1208                         g_warning("EMutex destroy failed: %s", strerror(errno));
1209                 g_free(m);
1210                 break;
1211         case E_MUTEX_REC:
1212                 ret = pthread_mutex_destroy(&m->mutex);
1213                 if (ret == -1)
1214                         g_warning("EMutex destroy failed: %s", strerror(errno));
1215                 ret = pthread_cond_destroy(&m->cond);
1216                 if (ret == -1)
1217                         g_warning("EMutex destroy failed: %s", strerror(errno));
1218                 g_free(m);
1219
1220         }
1221         return ret;
1222 }
1223
1224 int e_mutex_lock(EMutex *m)
1225 {
1226         pthread_t id;
1227         int err;
1228
1229         switch (m->type) {
1230         case E_MUTEX_SIMPLE:
1231                 return pthread_mutex_lock(&m->mutex);
1232         case E_MUTEX_REC:
1233                 id = pthread_self();
1234                 if ((err = pthread_mutex_lock(&m->mutex)) != 0)
1235                         return err;
1236                 while (1) {
1237                         if (!m->have_owner) {
1238                                 m->owner = id;
1239                                 m->have_owner = TRUE;
1240                                 m->depth = 1;
1241                                 break;
1242                         } else if (pthread_equal (id, m->owner)) {
1243                                 m->depth++;
1244                                 break;
1245                         } else {
1246                                 m->waiters++;
1247                                 if ((err = pthread_cond_wait(&m->cond, &m->mutex)) != 0)
1248                                         return err;
1249                                 m->waiters--;
1250                         }
1251                 }
1252                 return pthread_mutex_unlock(&m->mutex);
1253         }
1254
1255         return EINVAL;
1256 }
1257
1258 int e_mutex_unlock(EMutex *m)
1259 {
1260         int err;
1261
1262         switch (m->type) {
1263         case E_MUTEX_SIMPLE:
1264                 return pthread_mutex_unlock(&m->mutex);
1265         case E_MUTEX_REC:
1266                 if ((err = pthread_mutex_lock(&m->mutex)) != 0)
1267                         return err;
1268                 g_assert(m->have_owner && pthread_equal(m->owner, pthread_self()));
1269
1270                 m->depth--;
1271                 if (m->depth == 0) {
1272                         m->have_owner = FALSE;
1273                         if (m->waiters > 0)
1274                                 pthread_cond_signal(&m->cond);
1275                 }
1276                 return pthread_mutex_unlock(&m->mutex);
1277         }
1278
1279         errno = EINVAL;
1280         return -1;
1281 }
1282
1283 void e_mutex_assert_locked(EMutex *m)
1284 {
1285         g_return_if_fail (m->type == E_MUTEX_REC);
1286         pthread_mutex_lock(&m->mutex);
1287         g_assert(m->have_owner && pthread_equal(m->owner, pthread_self()));
1288         pthread_mutex_unlock(&m->mutex);
1289 }
1290
1291 int e_mutex_cond_wait(void *vcond, EMutex *m)
1292 {
1293         int ret;
1294         pthread_cond_t *cond = vcond;
1295
1296         switch(m->type) {
1297         case E_MUTEX_SIMPLE:
1298                 return pthread_cond_wait(cond, &m->mutex);
1299         case E_MUTEX_REC:
1300                 if ((ret = pthread_mutex_lock(&m->mutex)) != 0)
1301                         return ret;
1302                 g_assert(m->have_owner && pthread_equal(m->owner, pthread_self()));
1303                 ret = pthread_cond_wait(cond, &m->mutex);
1304                 g_assert(m->have_owner && pthread_equal(m->owner, pthread_self()));
1305                 pthread_mutex_unlock(&m->mutex);
1306                 return ret;
1307         default:
1308                 g_return_val_if_reached(-1);
1309         }
1310 }
1311
1312 #ifdef STANDALONE
1313
1314 static EMsgPort *server_port;
1315
1316 void *fdserver(void *data)
1317 {
1318         int fd;
1319         EMsg *msg;
1320         int id = (int)data;
1321         fd_set rfds;
1322
1323         fd = e_msgport_fd(server_port);
1324
1325         while (1) {
1326                 int count = 0;
1327
1328                 printf("server %d: waiting on fd %d\n", id, fd);
1329                 FD_ZERO(&rfds);
1330                 FD_SET(fd, &rfds);
1331                 select(fd+1, &rfds, NULL, NULL, NULL);
1332                 printf("server %d: Got async notification, checking for messages\n", id);
1333                 while ((msg = e_msgport_get(server_port))) {
1334                         printf("server %d: got message\n", id);
1335                         g_usleep(1000000);
1336                         printf("server %d: replying\n", id);
1337                         e_msgport_reply(msg);
1338                         count++;
1339                 }
1340                 printf("server %d: got %d messages\n", id, count);
1341         }
1342 }
1343
1344 void *server(void *data)
1345 {
1346         EMsg *msg;
1347         int id = (int)data;
1348
1349         while (1) {
1350                 printf("server %d: waiting\n", id);
1351                 msg = e_msgport_wait(server_port);
1352                 if (msg) {
1353                         printf("server %d: got message\n", id);
1354                         g_usleep(1000000);
1355                         printf("server %d: replying\n", id);
1356                         e_msgport_reply(msg);
1357                 } else {
1358                         printf("server %d: didn't get message\n", id);
1359                 }
1360         }
1361         return NULL;
1362 }
1363
1364 void *client(void *data)
1365 {
1366         EMsg *msg;
1367         EMsgPort *replyport;
1368         int i;
1369
1370         replyport = e_msgport_new();
1371         msg = g_malloc0(sizeof(*msg));
1372         msg->reply_port = replyport;
1373         for (i=0;i<10;i++) {
1374                 /* synchronous operation */
1375                 printf("client: sending\n");
1376                 e_msgport_put(server_port, msg);
1377                 printf("client: waiting for reply\n");
1378                 e_msgport_wait(replyport);
1379                 printf("client: got reply\n");
1380         }
1381
1382         printf("client: sleeping ...\n");
1383         g_usleep(2000000);
1384         printf("client: sending multiple\n");
1385
1386         for (i=0;i<10;i++) {
1387                 msg = g_malloc0(sizeof(*msg));
1388                 msg->reply_port = replyport;
1389                 e_msgport_put(server_port, msg);
1390         }
1391
1392         printf("client: receiving multiple\n");
1393         for (i=0;i<10;i++) {
1394                 msg = e_msgport_wait(replyport);
1395                 g_free(msg);
1396         }
1397
1398         printf("client: done\n");
1399         return NULL;
1400 }
1401
1402 int main(int argc, char **argv)
1403 {
1404         pthread_t serverid, clientid;
1405
1406         g_thread_init(NULL);
1407
1408 #ifdef G_OS_WIN32
1409         {
1410                 WSADATA wsadata;
1411                 if (WSAStartup (MAKEWORD (2, 0), &wsadata) != 0)
1412                         g_error ("Windows Sockets could not be initialized");
1413         }
1414 #endif
1415
1416         server_port = e_msgport_new();
1417
1418         /*pthread_create(&serverid, NULL, server, (void *)1);*/
1419         pthread_create(&serverid, NULL, fdserver, (void *)1);
1420         pthread_create(&clientid, NULL, client, NULL);
1421
1422         g_usleep(60000000);
1423
1424         return 0;
1425 }
1426 #endif