1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
3 * Authors: Michael Zucchi <notzed@ximian.com>
5 * Copyright 2002 Ximian, Inc. (www.ximian.com)
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of version 2 of the GNU Lesser General Public
9 * License as published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
28 #include <sys/types.h>
46 #include "e-msgport.h"
47 #include "e-data-server-util.h"
49 #define m(x) /* msgport debug */
50 #define t(x) /* thread debug */
51 #define c(x) /* cache debug */
54 #define E_CLOSE(socket) closesocket (socket)
55 #define E_READ(socket,buf,nbytes) recv(socket,buf,nbytes,0)
56 #define E_WRITE(socket,buf,nbytes) send(socket,buf,nbytes,0)
57 #define E_IS_SOCKET_ERROR(status) ((status) == SOCKET_ERROR)
58 #define E_IS_INVALID_SOCKET(socket) ((socket) == INVALID_SOCKET)
59 #define E_IS_STATUS_INTR() 0 /* No WSAEINTR errors in WinSock2 */
61 #define E_CLOSE(socket) close (socket)
62 #define E_READ(socket,buf,nbytes) read(socket,buf,nbytes)
63 #define E_WRITE(socket,buf,nbytes) write(socket,buf,nbytes)
64 #define E_IS_SOCKET_ERROR(status) ((status) == -1)
65 #define E_IS_INVALID_SOCKET(socket) ((socket) < 0)
66 #define E_IS_STATUS_INTR() (errno == EINTR)
81 SOCKET temp, socket1 = -1, socket2 = -1;
82 struct sockaddr_in saddr;
85 fd_set read_set, write_set;
88 temp = socket (AF_INET, SOCK_STREAM, 0);
90 if (temp == INVALID_SOCKET) {
95 if (ioctlsocket (temp, FIONBIO, &arg) == SOCKET_ERROR) {
99 memset (&saddr, 0, sizeof (saddr));
100 saddr.sin_family = AF_INET;
102 saddr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
104 if (bind (temp, (struct sockaddr *)&saddr, sizeof (saddr))) {
108 if (listen (temp, 1) == SOCKET_ERROR) {
112 len = sizeof (saddr);
113 if (getsockname (temp, (struct sockaddr *)&saddr, &len)) {
117 socket1 = socket (AF_INET, SOCK_STREAM, 0);
119 if (socket1 == INVALID_SOCKET) {
124 if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR) {
128 if (connect (socket1, (struct sockaddr *)&saddr, len) != SOCKET_ERROR ||
129 WSAGetLastError () != WSAEWOULDBLOCK) {
134 FD_SET (temp, &read_set);
139 if (select (0, &read_set, NULL, NULL, NULL) == SOCKET_ERROR) {
143 if (!FD_ISSET (temp, &read_set)) {
147 socket2 = accept (temp, (struct sockaddr *) &saddr, &len);
148 if (socket2 == INVALID_SOCKET) {
152 FD_ZERO (&write_set);
153 FD_SET (socket1, &write_set);
158 if (select (0, NULL, &write_set, NULL, NULL) == SOCKET_ERROR) {
162 if (!FD_ISSET (socket1, &write_set)) {
167 if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR) {
172 if (ioctlsocket (socket2, FIONBIO, &arg) == SOCKET_ERROR) {
184 closesocket (socket2);
186 closesocket (socket1);
189 errno = EMFILE; /* FIXME: use the real syscall errno? */
199 void e_dlist_init(EDList *v)
201 v->head = (EDListNode *)&v->tail;
203 v->tailpred = (EDListNode *)&v->head;
206 EDListNode *e_dlist_addhead(EDList *l, EDListNode *n)
209 n->prev = (EDListNode *)&l->head;
215 EDListNode *e_dlist_addtail(EDList *l, EDListNode *n)
217 n->next = (EDListNode *)&l->tail;
218 n->prev = l->tailpred;
219 l->tailpred->next = n;
224 EDListNode *e_dlist_remove(EDListNode *n)
226 n->next->prev = n->prev;
227 n->prev->next = n->next;
231 EDListNode *e_dlist_remhead(EDList *l)
245 EDListNode *e_dlist_remtail(EDList *l)
259 int e_dlist_empty(EDList *l)
261 return (l->head == (EDListNode *)&l->tail);
264 int e_dlist_length(EDList *l)
282 GHashTable *key_table;
296 * Setup a new timeout cache. @nodesize is the size of nodes in the
297 * cache, and @nodefree will be called to free YOUR content.
302 em_cache_new(time_t timeout, size_t nodesize, GFreeFunc nodefree)
304 struct _EMCache *emc;
306 emc = g_malloc0(sizeof(*emc));
307 emc->node_size = nodesize;
308 emc->key_table = g_hash_table_new(g_str_hash, g_str_equal);
309 emc->node_free = nodefree;
310 e_dlist_init(&emc->lru_list);
311 emc->lock = g_mutex_new();
312 emc->timeout = timeout;
321 * destroy the cache, duh.
324 em_cache_destroy(EMCache *emc)
327 g_mutex_free(emc->lock);
336 * Lookup a cache node. once you're finished with it, you need to
342 em_cache_lookup(EMCache *emc, const char *key)
346 g_mutex_lock(emc->lock);
347 n = g_hash_table_lookup(emc->key_table, key);
349 e_dlist_remove((EDListNode *)n);
350 e_dlist_addhead(&emc->lru_list, (EDListNode *)n);
354 g_mutex_unlock(emc->lock);
356 c(printf("looking up '%s' %s\n", key, n?"found":"not found"));
366 * Create a new key'd cache node. The node will not be added to the
367 * cache until you insert it.
372 em_cache_node_new(EMCache *emc, const char *key)
376 /* this could use memchunks, but its probably overkill */
377 n = g_malloc0(emc->node_size);
378 n->key = g_strdup(key);
384 * em_cache_node_unref:
388 * unref a cache node, you can only unref nodes which have been looked
392 em_cache_node_unref(EMCache *emc, EMCacheNode *n)
394 g_mutex_lock(emc->lock);
395 g_assert(n->ref_count > 0);
397 g_mutex_unlock(emc->lock);
405 * Add a cache node to the cache, once added the memory is owned by
406 * the cache. If there are conflicts and the old node is still in
407 * use, then the new node is not added, otherwise it is added and any
408 * nodes older than the expire time are flushed.
411 em_cache_add(EMCache *emc, EMCacheNode *n)
413 EMCacheNode *old, *prev;
416 e_dlist_init(&old_nodes);
418 g_mutex_lock(emc->lock);
419 old = g_hash_table_lookup(emc->key_table, n->key);
421 if (old->ref_count == 0) {
422 g_hash_table_remove(emc->key_table, old->key);
423 e_dlist_remove((EDListNode *)old);
424 e_dlist_addtail(&old_nodes, (EDListNode *)old);
427 e_dlist_addtail(&old_nodes, (EDListNode *)n);
433 g_hash_table_insert(emc->key_table, n->key, n);
434 e_dlist_addhead(&emc->lru_list, (EDListNode *)n);
438 c(printf("inserting node %s\n", n->key));
440 old = (EMCacheNode *)emc->lru_list.tailpred;
442 while (prev && old->stamp < now - emc->timeout) {
443 if (old->ref_count == 0) {
444 c(printf("expiring node %s\n", old->key));
445 g_hash_table_remove(emc->key_table, old->key);
446 e_dlist_remove((EDListNode *)old);
447 e_dlist_addtail(&old_nodes, (EDListNode *)old);
454 g_mutex_unlock(emc->lock);
456 while ((old = (EMCacheNode *)e_dlist_remhead(&old_nodes))) {
467 * clear the cache. just for api completeness.
470 em_cache_clear(EMCache *emc)
475 e_dlist_init(&old_nodes);
476 g_mutex_lock(emc->lock);
477 while ((node = (EMCacheNode *)e_dlist_remhead(&emc->lru_list)))
478 e_dlist_addtail(&old_nodes, (EDListNode *)node);
479 g_mutex_unlock(emc->lock);
481 while ((node = (EMCacheNode *)e_dlist_remhead(&old_nodes))) {
482 emc->node_free(node);
490 gint pipe[2]; /* on Win32, actually a pair of SOCKETs */
492 PRFileDesc *prpipe[2];
498 MSG_FLAG_SYNC_WITH_PIPE = 1 << 0,
499 MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1
504 e_prpipe (PRFileDesc **fds)
507 if (PR_NewTCPSocketPair (fds) != PR_FAILURE)
510 if (PR_CreatePipe (&fds[0], &fds[1]) != PR_FAILURE)
522 msgport_sync_with_pipe (gint fd)
527 if (E_READ (fd, buffer, 1) > 0)
529 else if (!E_IS_STATUS_INTR ()) {
530 g_warning ("%s: Failed to read from pipe: %s",
531 G_STRFUNC, g_strerror (errno));
539 msgport_sync_with_prpipe (PRFileDesc *prfd)
543 while (prfd != NULL) {
544 if (PR_Read (prfd, buffer, 1) > 0)
546 else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
547 gchar *text = g_alloca (PR_GetErrorTextLength ());
548 PR_GetErrorText (text);
549 g_warning ("%s: Failed to read from NSPR pipe: %s",
562 msgport = g_slice_new (EMsgPort);
563 msgport->queue = g_async_queue_new ();
564 msgport->pipe[0] = -1;
565 msgport->pipe[1] = -1;
567 msgport->prpipe[0] = NULL;
568 msgport->prpipe[1] = NULL;
575 e_msgport_destroy (EMsgPort *msgport)
577 g_return_if_fail (msgport != NULL);
579 if (msgport->pipe[0] >= 0) {
580 E_CLOSE (msgport->pipe[0]);
581 E_CLOSE (msgport->pipe[1]);
584 if (msgport->prpipe[0] != NULL) {
585 PR_Close (msgport->prpipe[0]);
586 PR_Close (msgport->prpipe[1]);
590 g_async_queue_unref (msgport->queue);
591 g_slice_free (EMsgPort, msgport);
595 e_msgport_fd (EMsgPort *msgport)
599 g_return_val_if_fail (msgport != NULL, -1);
601 g_async_queue_lock (msgport->queue);
602 fd = msgport->pipe[0];
603 if (fd < 0 && e_pipe (msgport->pipe) == 0)
604 fd = msgport->pipe[0];
605 g_async_queue_unlock (msgport->queue);
612 e_msgport_prfd (EMsgPort *msgport)
616 g_return_val_if_fail (msgport != NULL, NULL);
618 g_async_queue_lock (msgport->queue);
619 prfd = msgport->prpipe[0];
620 if (prfd == NULL && e_prpipe (msgport->prpipe) == 0)
621 prfd = msgport->prpipe[0];
622 g_async_queue_unlock (msgport->queue);
629 e_msgport_put (EMsgPort *msgport, EMsg *msg)
636 g_return_if_fail (msgport != NULL);
637 g_return_if_fail (msg != NULL);
639 g_async_queue_lock (msgport->queue);
643 fd = msgport->pipe[1];
645 if (E_WRITE (fd, "E", 1) > 0) {
646 msg->flags |= MSG_FLAG_SYNC_WITH_PIPE;
648 } else if (!E_IS_STATUS_INTR ()) {
649 g_warning ("%s: Failed to write to pipe: %s",
650 G_STRFUNC, g_strerror (errno));
656 prfd = msgport->prpipe[1];
657 while (prfd != NULL) {
658 if (PR_Write (prfd, "E", 1) > 0) {
659 msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE;
661 } else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
662 gchar *text = g_alloca (PR_GetErrorTextLength ());
663 PR_GetErrorText (text);
664 g_warning ("%s: Failed to write to NSPR pipe: %s",
671 g_async_queue_push_unlocked (msgport->queue, msg);
672 g_async_queue_unlock (msgport->queue);
676 e_msgport_wait (EMsgPort *msgport)
680 g_return_val_if_fail (msgport != NULL, NULL);
682 g_async_queue_lock (msgport->queue);
684 msg = g_async_queue_pop_unlocked (msgport->queue);
686 g_assert (msg != NULL);
688 if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
689 msgport_sync_with_pipe (msgport->pipe[0]);
691 if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
692 msgport_sync_with_prpipe (msgport->prpipe[0]);
695 g_async_queue_unlock (msgport->queue);
701 e_msgport_get (EMsgPort *msgport)
705 g_return_val_if_fail (msgport != NULL, NULL);
707 g_async_queue_lock (msgport->queue);
709 msg = g_async_queue_try_pop_unlocked (msgport->queue);
711 if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
712 msgport_sync_with_pipe (msgport->pipe[0]);
714 if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
715 msgport_sync_with_prpipe (msgport->prpipe[0]);
718 g_async_queue_unlock (msgport->queue);
724 e_msgport_reply (EMsg *msg)
726 g_return_if_fail (msg != NULL);
729 e_msgport_put (msg->reply_port, msg);
734 #ifndef EDS_DISABLE_DEPRECATED
736 struct _thread_info {
742 struct _EThread *next;
743 struct _EThread *prev;
745 EMsgPort *server_port;
746 EMsgPort *reply_port;
747 pthread_mutex_t mutex;
751 int waiting; /* if we are waiting for a new message, count of waiting processes */
752 pthread_t id; /* our running child thread */
754 GList *id_list; /* if THREAD_NEW, then a list of our child threads in thread_info structs */
759 EThreadFunc received;
766 /* All active threads */
767 static EDList ethread_list = E_DLIST_INITIALISER(ethread_list);
768 static pthread_mutex_t ethread_lock = PTHREAD_MUTEX_INITIALIZER;
770 #define E_THREAD_QUIT_REPLYPORT ((struct _EMsgPort *)~0)
772 static void thread_destroy_msg(EThread *e, EMsg *m);
774 static struct _thread_info *thread_find(EThread *e, pthread_t id)
777 struct _thread_info *info;
782 if (pthread_equal (info->id, id))
790 static void thread_remove(EThread *e, pthread_t id)
793 struct _thread_info *info;
798 if (pthread_equal (info->id, id)) {
799 e->id_list = g_list_remove(e->id_list, info);
807 EThread *e_thread_new(e_thread_t type)
811 e = g_malloc0(sizeof(*e));
812 pthread_mutex_init(&e->mutex, 0);
814 e->server_port = e_msgport_new();
815 e->have_thread = FALSE;
816 e->queue_limit = INT_MAX;
818 pthread_mutex_lock(ðread_lock);
819 e_dlist_addtail(ðread_list, (EDListNode *)e);
820 pthread_mutex_unlock(ðread_lock);
825 /* close down the threads & resources etc */
826 void e_thread_destroy(EThread *e)
830 struct _thread_info *info;
833 /* make sure we soak up all the messages first */
834 while ( (msg = e_msgport_get(e->server_port)) ) {
835 thread_destroy_msg(e, msg);
838 pthread_mutex_lock(&e->mutex);
843 /* if we have a thread, 'kill' it */
844 if (e->have_thread) {
845 pthread_t id = e->id;
846 t(printf("Sending thread '%" G_GUINT64_FORMAT "' quit message\n", e_util_pthread_id(id)));
848 e->have_thread = FALSE;
850 msg = g_malloc0(sizeof(*msg));
851 msg->reply_port = E_THREAD_QUIT_REPLYPORT;
852 e_msgport_put(e->server_port, msg);
854 pthread_mutex_unlock(&e->mutex);
855 t(printf("Joining thread '%" G_GUINT64_FORMAT "'\n", e_util_pthread_id(id)));
857 t(printf("Joined thread '%" G_GUINT64_FORMAT "'!\n", e_util_pthread_id(id)));
858 pthread_mutex_lock(&e->mutex);
860 busy = e->have_thread;
863 /* first, send everyone a quit message */
867 t(printf("Sending thread '%" G_GUINT64_FORMAT "' quit message\n", e_util_pthread_id(info->id)));
868 msg = g_malloc0(sizeof(*msg));
869 msg->reply_port = E_THREAD_QUIT_REPLYPORT;
870 e_msgport_put(e->server_port, msg);
874 /* then, wait for everyone to quit */
876 info = e->id_list->data;
877 e->id_list = g_list_remove(e->id_list, info);
878 pthread_mutex_unlock(&e->mutex);
879 t(printf("Joining thread '%" G_GUINT64_FORMAT "'\n", e_util_pthread_id(info->id)));
880 pthread_join(info->id, 0);
881 t(printf("Joined thread '%" G_GUINT64_FORMAT "'!\n", e_util_pthread_id(info->id)));
882 pthread_mutex_lock(&e->mutex);
885 busy = g_list_length(e->id_list) != 0;
889 pthread_mutex_unlock(&e->mutex);
891 /* and clean up, if we can */
893 g_warning("threads were busy, leaked EThread");
897 pthread_mutex_lock(ðread_lock);
898 e_dlist_remove((EDListNode *)e);
899 pthread_mutex_unlock(ðread_lock);
901 pthread_mutex_destroy(&e->mutex);
902 e_msgport_destroy(e->server_port);
906 /* set the queue maximum depth, what happens when the queue
907 fills up depends on the queue type */
908 void e_thread_set_queue_limit(EThread *e, int limit)
910 e->queue_limit = limit;
913 /* set a msg destroy callback, this can not call any e_thread functions on @e */
914 void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data)
916 pthread_mutex_lock(&e->mutex);
917 e->destroy = destroy;
918 e->destroy_data = data;
919 pthread_mutex_unlock(&e->mutex);
922 /* set a message lost callback, called if any message is discarded */
923 void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data)
925 pthread_mutex_lock(&e->mutex);
928 pthread_mutex_unlock(&e->mutex);
931 /* set a reply port, if set, then send messages back once finished */
932 void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port)
934 e->reply_port = reply_port;
937 /* set a received data callback */
938 void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data)
940 pthread_mutex_lock(&e->mutex);
941 e->received = received;
942 e->received_data = data;
943 pthread_mutex_unlock(&e->mutex);
946 /* find out if we're busy doing any work, e==NULL, check for all work */
947 int e_thread_busy(EThread *e)
952 pthread_mutex_lock(ðread_lock);
953 e = (EThread *)ethread_list.head;
954 while (e->next && !busy) {
955 busy = e_thread_busy(e);
958 pthread_mutex_unlock(ðread_lock);
960 pthread_mutex_lock(&e->mutex);
964 busy = e->waiting != 1 && e->have_thread;
967 busy = e->waiting != g_list_length(e->id_list);
970 pthread_mutex_unlock(&e->mutex);
977 thread_destroy_msg(EThread *e, EMsg *m)
982 /* we do this so we never get an incomplete/unmatched callback + data */
983 pthread_mutex_lock(&e->mutex);
985 func_data = e->destroy_data;
986 pthread_mutex_unlock(&e->mutex);
989 func(e, m, func_data);
993 thread_received_msg(EThread *e, EMsg *m)
998 /* we do this so we never get an incomplete/unmatched callback + data */
999 pthread_mutex_lock(&e->mutex);
1001 func_data = e->received_data;
1002 pthread_mutex_unlock(&e->mutex);
1005 func(e, m, func_data);
1007 g_warning("No processing callback for EThread, message unprocessed");
1011 thread_lost_msg(EThread *e, EMsg *m)
1016 /* we do this so we never get an incomplete/unmatched callback + data */
1017 pthread_mutex_lock(&e->mutex);
1019 func_data = e->lost_data;
1020 pthread_mutex_unlock(&e->mutex);
1023 func(e, m, func_data);
1026 /* the actual thread dispatcher */
1028 thread_dispatch(void *din)
1032 struct _thread_info *info;
1033 pthread_t self = pthread_self();
1035 t(printf("dispatch thread started: %" G_GUINT64_FORMAT "\n", e_util_pthread_id(self)));
1038 pthread_mutex_lock(&e->mutex);
1039 m = e_msgport_get(e->server_port);
1041 /* nothing to do? If we are a 'new' type thread, just quit.
1042 Otherwise, go into waiting (can be cancelled here) */
1043 info = thread_find(e, self);
1047 pthread_mutex_unlock(&e->mutex);
1048 m = e_msgport_wait(e->server_port);
1049 pthread_mutex_lock(&e->mutex);
1053 if (m->reply_port == E_THREAD_QUIT_REPLYPORT) {
1054 t(printf("Thread %" G_GUINT64_FORMAT " got quit message\n", e_util_pthread_id(self)));
1055 /* Handle a quit message, say we're quitting, free the message, and break out of the loop */
1056 info = thread_find(e, self);
1059 pthread_mutex_unlock(&e->mutex);
1063 info = thread_find(e, self);
1067 pthread_mutex_unlock(&e->mutex);
1069 t(printf("got message in dispatch thread\n"));
1072 thread_received_msg(e, m);
1074 /* if we have a reply port, send it back, otherwise, lose it */
1075 if (m->reply_port) {
1078 thread_destroy_msg(e, m);
1085 /* send a message to the thread, start thread if necessary */
1086 void e_thread_put(EThread *e, EMsg *msg)
1091 pthread_mutex_lock(&e->mutex);
1093 /* the caller forgot to tell us what to do, well, we can't do anything can we */
1094 if (e->received == NULL) {
1095 pthread_mutex_unlock(&e->mutex);
1096 g_warning("EThread called with no receiver function, no work to do!");
1097 thread_destroy_msg(e, msg);
1101 msg->reply_port = e->reply_port;
1104 case E_THREAD_QUEUE:
1105 /* if the queue is full, lose this new addition */
1106 if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
1107 e_msgport_put(e->server_port, msg);
1109 printf("queue limit reached, dropping new message\n");
1114 /* if the queue is full, lose the oldest (unprocessed) message */
1115 if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
1116 e_msgport_put(e->server_port, msg);
1118 printf("queue limit reached, dropping old message\n");
1119 e_msgport_put(e->server_port, msg);
1120 dmsg = e_msgport_get(e->server_port);
1124 /* it is possible that an existing thread can catch this message, so
1125 we might create a thread with no work to do.
1126 but that doesn't matter, the other alternative that it be lost is worse */
1127 e_msgport_put(e->server_port, msg);
1129 && g_list_length(e->id_list) < e->queue_limit
1130 && pthread_create(&id, NULL, thread_dispatch, e) == 0) {
1131 struct _thread_info *info = g_malloc0(sizeof(*info));
1132 t(printf("created NEW thread %" G_GUINT64_FORMAT "\n", e_util_pthread_id(id)));
1135 e->id_list = g_list_append(e->id_list, info);
1137 pthread_mutex_unlock(&e->mutex);
1141 /* create the thread, if there is none to receive it yet */
1142 if (!e->have_thread) {
1145 if ((err = pthread_create(&e->id, NULL, thread_dispatch, e)) != 0) {
1146 g_warning("Could not create dispatcher thread, message queued?: %s", strerror(err));
1148 e->have_thread = TRUE;
1152 pthread_mutex_unlock(&e->mutex);
1155 thread_lost_msg(e, dmsg);
1156 thread_destroy_msg(e, dmsg);
1159 #endif /* EDS_DISABLE_DEPRECATED */
1161 /* yet-another-mutex interface */
1168 pthread_mutex_t mutex;
1169 pthread_cond_t cond;
1172 /* sigh, this is just painful to have to need, but recursive
1173 read/write, etc mutexes just aren't very common in thread
1175 /* TODO: Just make it use recursive mutexes if they are available */
1176 EMutex *e_mutex_new(e_mutex_t type)
1180 m = g_malloc(sizeof(*m));
1184 m->have_owner = FALSE;
1187 case E_MUTEX_SIMPLE:
1188 pthread_mutex_init(&m->mutex, 0);
1191 pthread_mutex_init(&m->mutex, 0);
1192 pthread_cond_init(&m->cond, 0);
1194 /* read / write ? flags for same? */
1200 int e_mutex_destroy(EMutex *m)
1205 case E_MUTEX_SIMPLE:
1206 ret = pthread_mutex_destroy(&m->mutex);
1208 g_warning("EMutex destroy failed: %s", strerror(errno));
1212 ret = pthread_mutex_destroy(&m->mutex);
1214 g_warning("EMutex destroy failed: %s", strerror(errno));
1215 ret = pthread_cond_destroy(&m->cond);
1217 g_warning("EMutex destroy failed: %s", strerror(errno));
1224 int e_mutex_lock(EMutex *m)
1230 case E_MUTEX_SIMPLE:
1231 return pthread_mutex_lock(&m->mutex);
1233 id = pthread_self();
1234 if ((err = pthread_mutex_lock(&m->mutex)) != 0)
1237 if (!m->have_owner) {
1239 m->have_owner = TRUE;
1242 } else if (pthread_equal (id, m->owner)) {
1247 if ((err = pthread_cond_wait(&m->cond, &m->mutex)) != 0)
1252 return pthread_mutex_unlock(&m->mutex);
1258 int e_mutex_unlock(EMutex *m)
1263 case E_MUTEX_SIMPLE:
1264 return pthread_mutex_unlock(&m->mutex);
1266 if ((err = pthread_mutex_lock(&m->mutex)) != 0)
1268 g_assert(m->have_owner && pthread_equal(m->owner, pthread_self()));
1271 if (m->depth == 0) {
1272 m->have_owner = FALSE;
1274 pthread_cond_signal(&m->cond);
1276 return pthread_mutex_unlock(&m->mutex);
1283 void e_mutex_assert_locked(EMutex *m)
1285 g_return_if_fail (m->type == E_MUTEX_REC);
1286 pthread_mutex_lock(&m->mutex);
1287 g_assert(m->have_owner && pthread_equal(m->owner, pthread_self()));
1288 pthread_mutex_unlock(&m->mutex);
1291 int e_mutex_cond_wait(void *vcond, EMutex *m)
1294 pthread_cond_t *cond = vcond;
1297 case E_MUTEX_SIMPLE:
1298 return pthread_cond_wait(cond, &m->mutex);
1300 if ((ret = pthread_mutex_lock(&m->mutex)) != 0)
1302 g_assert(m->have_owner && pthread_equal(m->owner, pthread_self()));
1303 ret = pthread_cond_wait(cond, &m->mutex);
1304 g_assert(m->have_owner && pthread_equal(m->owner, pthread_self()));
1305 pthread_mutex_unlock(&m->mutex);
1308 g_return_val_if_reached(-1);
1314 static EMsgPort *server_port;
1316 void *fdserver(void *data)
1323 fd = e_msgport_fd(server_port);
1328 printf("server %d: waiting on fd %d\n", id, fd);
1331 select(fd+1, &rfds, NULL, NULL, NULL);
1332 printf("server %d: Got async notification, checking for messages\n", id);
1333 while ((msg = e_msgport_get(server_port))) {
1334 printf("server %d: got message\n", id);
1336 printf("server %d: replying\n", id);
1337 e_msgport_reply(msg);
1340 printf("server %d: got %d messages\n", id, count);
1344 void *server(void *data)
1350 printf("server %d: waiting\n", id);
1351 msg = e_msgport_wait(server_port);
1353 printf("server %d: got message\n", id);
1355 printf("server %d: replying\n", id);
1356 e_msgport_reply(msg);
1358 printf("server %d: didn't get message\n", id);
1364 void *client(void *data)
1367 EMsgPort *replyport;
1370 replyport = e_msgport_new();
1371 msg = g_malloc0(sizeof(*msg));
1372 msg->reply_port = replyport;
1373 for (i=0;i<10;i++) {
1374 /* synchronous operation */
1375 printf("client: sending\n");
1376 e_msgport_put(server_port, msg);
1377 printf("client: waiting for reply\n");
1378 e_msgport_wait(replyport);
1379 printf("client: got reply\n");
1382 printf("client: sleeping ...\n");
1384 printf("client: sending multiple\n");
1386 for (i=0;i<10;i++) {
1387 msg = g_malloc0(sizeof(*msg));
1388 msg->reply_port = replyport;
1389 e_msgport_put(server_port, msg);
1392 printf("client: receiving multiple\n");
1393 for (i=0;i<10;i++) {
1394 msg = e_msgport_wait(replyport);
1398 printf("client: done\n");
1402 int main(int argc, char **argv)
1404 pthread_t serverid, clientid;
1406 g_thread_init(NULL);
1411 if (WSAStartup (MAKEWORD (2, 0), &wsadata) != 0)
1412 g_error ("Windows Sockets could not be initialized");
1416 server_port = e_msgport_new();
1418 /*pthread_create(&serverid, NULL, server, (void *)1);*/
1419 pthread_create(&serverid, NULL, fdserver, (void *)1);
1420 pthread_create(&clientid, NULL, client, NULL);