Extending test-client-custom-summary to try e_book_client_get_contacts_uids()
[platform/upstream/evolution-data-server.git] / camel / camel-msgport.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright (C) 1999-2008 Novell, Inc. (www.novell.com)
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of version 2 of the GNU Lesser General Public
7  * License as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this program; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  *
19  */
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <errno.h>
26 #include <unistd.h>
27
28 #include <nspr.h>
29
30 #ifdef G_OS_WIN32
31 #define WIN32_LEAN_AND_MEAN
32 #include <winsock2.h>
33 #endif
34
35 #include "camel-msgport.h"
36
37 #ifdef G_OS_WIN32
38 #define MP_CLOSE(socket)                closesocket (socket)
39 #define MP_READ(socket, buf, nbytes)    recv((socket), (buf), (nbytes), 0)
40 #define MP_WRITE(socket, buf, nbytes)   send((socket), (buf), (nbytes), 0)
41 #define MP_IS_STATUS_INTR()             0 /* No WSAEINTR errors in WinSock2 */
42 #else
43 #define MP_CLOSE(socket)                close (socket)
44 #define MP_READ(socket, buf, nbytes)    read((socket), (buf), (nbytes))
45 #define MP_WRITE(socket, buf, nbytes)   write((socket), (buf), (nbytes))
46 #define MP_IS_STATUS_INTR()             (errno == EINTR)
47 #endif
48
49 /* message flags */
50 enum {
51         MSG_FLAG_SYNC_WITH_PIPE    = 1 << 0,
52         MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1
53 };
54
55 struct _CamelMsgPort {
56         GAsyncQueue *queue;
57         gint pipe[2];  /* on Win32, actually a pair of SOCKETs */
58         PRFileDesc *prpipe[2];
59 };
60
61 static gint
62 msgport_pipe (gint *fds)
63 {
64 #ifndef G_OS_WIN32
65         if (pipe (fds) != -1)
66                 return 0;
67
68         fds[0] = -1;
69         fds[1] = -1;
70
71         return -1;
72 #else
73         SOCKET temp, socket1 = -1, socket2 = -1;
74         struct sockaddr_in saddr;
75         gint len;
76         u_long arg;
77         fd_set read_set, write_set;
78         struct timeval tv;
79
80         temp = socket (AF_INET, SOCK_STREAM, 0);
81
82         if (temp == INVALID_SOCKET) {
83                 goto out0;
84         }
85
86         arg = 1;
87         if (ioctlsocket (temp, FIONBIO, &arg) == SOCKET_ERROR) {
88                 goto out0;
89         }
90
91         memset (&saddr, 0, sizeof (saddr));
92         saddr.sin_family = AF_INET;
93         saddr.sin_port = 0;
94         saddr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
95
96         if (bind (temp, (struct sockaddr *) &saddr, sizeof (saddr))) {
97                 goto out0;
98         }
99
100         if (listen (temp, 1) == SOCKET_ERROR) {
101                 goto out0;
102         }
103
104         len = sizeof (saddr);
105         if (getsockname (temp, (struct sockaddr *) &saddr, &len)) {
106                 goto out0;
107         }
108
109         socket1 = socket (AF_INET, SOCK_STREAM, 0);
110
111         if (socket1 == INVALID_SOCKET) {
112                 goto out0;
113         }
114
115         arg = 1;
116         if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR) {
117                 goto out1;
118         }
119
120         if (connect (socket1, (struct sockaddr  *) &saddr, len) != SOCKET_ERROR ||
121             WSAGetLastError () != WSAEWOULDBLOCK) {
122                 goto out1;
123         }
124
125         FD_ZERO (&read_set);
126         FD_SET (temp, &read_set);
127
128         tv.tv_sec = 0;
129         tv.tv_usec = 0;
130
131         if (select (0, &read_set, NULL, NULL, NULL) == SOCKET_ERROR) {
132                 goto out1;
133         }
134
135         if (!FD_ISSET (temp, &read_set)) {
136                 goto out1;
137         }
138
139         socket2 = accept (temp, (struct sockaddr *) &saddr, &len);
140         if (socket2 == INVALID_SOCKET) {
141                 goto out1;
142         }
143
144         FD_ZERO (&write_set);
145         FD_SET (socket1, &write_set);
146
147         tv.tv_sec = 0;
148         tv.tv_usec = 0;
149
150         if (select (0, NULL, &write_set, NULL, NULL) == SOCKET_ERROR) {
151                 goto out2;
152         }
153
154         if (!FD_ISSET (socket1, &write_set)) {
155                 goto out2;
156         }
157
158         arg = 0;
159         if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR) {
160                 goto out2;
161         }
162
163         arg = 0;
164         if (ioctlsocket (socket2, FIONBIO, &arg) == SOCKET_ERROR) {
165                 goto out2;
166         }
167
168         fds[0] = socket1;
169         fds[1] = socket2;
170
171         closesocket (temp);
172
173         return 0;
174
175 out2:
176         closesocket (socket2);
177 out1:
178         closesocket (socket1);
179 out0:
180         closesocket (temp);
181         errno = EMFILE;         /* FIXME: use the real syscall errno? */
182
183         fds[0] = -1;
184         fds[1] = -1;
185
186         return -1;
187
188 #endif
189 }
190
191 static gint
192 msgport_prpipe (PRFileDesc **fds)
193 {
194 #ifdef G_OS_WIN32
195         if (PR_NewTCPSocketPair (fds) != PR_FAILURE)
196                 return 0;
197 #else
198         if (PR_CreatePipe (&fds[0], &fds[1]) != PR_FAILURE)
199                 return 0;
200 #endif
201
202         fds[0] = NULL;
203         fds[1] = NULL;
204
205         return -1;
206 }
207
208 static void
209 msgport_sync_with_pipe (gint fd)
210 {
211         gchar buffer[1];
212
213         while (fd >= 0) {
214                 if (MP_READ (fd, buffer, 1) > 0)
215                         break;
216                 else if (!MP_IS_STATUS_INTR ()) {
217                         g_warning (
218                                 "%s: Failed to read from pipe: %s",
219                                 G_STRFUNC, g_strerror (errno));
220                         break;
221                 }
222         }
223 }
224
225 static void
226 msgport_sync_with_prpipe (PRFileDesc *prfd)
227 {
228         gchar buffer[1];
229
230         while (prfd != NULL) {
231                 if (PR_Read (prfd, buffer, 1) > 0)
232                         break;
233                 else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
234                         gchar *text = g_alloca (PR_GetErrorTextLength ());
235                         PR_GetErrorText (text);
236                         g_warning (
237                                 "%s: Failed to read from NSPR pipe: %s",
238                                 G_STRFUNC, text);
239                         break;
240                 }
241         }
242 }
243
244 /**
245  * camel_msgport_new:
246  *
247  * Since: 2.24
248  **/
249 CamelMsgPort *
250 camel_msgport_new (void)
251 {
252         CamelMsgPort *msgport;
253
254         msgport = g_slice_new (CamelMsgPort);
255         msgport->queue = g_async_queue_new ();
256         msgport->pipe[0] = -1;
257         msgport->pipe[1] = -1;
258         msgport->prpipe[0] = NULL;
259         msgport->prpipe[1] = NULL;
260
261         return msgport;
262 }
263
264 /**
265  * camel_msgport_destroy:
266  *
267  * Since: 2.24
268  **/
269 void
270 camel_msgport_destroy (CamelMsgPort *msgport)
271 {
272         g_return_if_fail (msgport != NULL);
273
274         if (msgport->pipe[0] >= 0) {
275                 MP_CLOSE (msgport->pipe[0]);
276                 MP_CLOSE (msgport->pipe[1]);
277         }
278         if (msgport->prpipe[0] != NULL) {
279                 PR_Close (msgport->prpipe[0]);
280                 PR_Close (msgport->prpipe[1]);
281         }
282
283         g_async_queue_unref (msgport->queue);
284         g_slice_free (CamelMsgPort, msgport);
285 }
286
287 /**
288  * camel_msgport_fd:
289  *
290  * Since: 2.24
291  **/
292 gint
293 camel_msgport_fd (CamelMsgPort *msgport)
294 {
295         gint fd;
296
297         g_return_val_if_fail (msgport != NULL, -1);
298
299         g_async_queue_lock (msgport->queue);
300         fd = msgport->pipe[0];
301         if (fd < 0 && msgport_pipe (msgport->pipe) == 0)
302                 fd = msgport->pipe[0];
303         g_async_queue_unlock (msgport->queue);
304
305         return fd;
306 }
307
308 /**
309  * camel_msgport_prfd:
310  *
311  * Since: 2.24
312  **/
313 PRFileDesc *
314 camel_msgport_prfd (CamelMsgPort *msgport)
315 {
316         PRFileDesc *prfd;
317
318         g_return_val_if_fail (msgport != NULL, NULL);
319
320         g_async_queue_lock (msgport->queue);
321         prfd = msgport->prpipe[0];
322         if (prfd == NULL && msgport_prpipe (msgport->prpipe) == 0)
323                 prfd = msgport->prpipe[0];
324         g_async_queue_unlock (msgport->queue);
325
326         return prfd;
327 }
328
329 /**
330  * camel_msgport_push:
331  *
332  * Since: 2.24
333  **/
334 void
335 camel_msgport_push (CamelMsgPort *msgport,
336                     CamelMsg *msg)
337 {
338         gint fd;
339         PRFileDesc *prfd;
340
341         g_return_if_fail (msgport != NULL);
342         g_return_if_fail (msg != NULL);
343
344         g_async_queue_lock (msgport->queue);
345
346         msg->flags = 0;
347
348         fd = msgport->pipe[1];
349         while (fd >= 0) {
350                 if (MP_WRITE (fd, "E", 1) > 0) {
351                         msg->flags |= MSG_FLAG_SYNC_WITH_PIPE;
352                         break;
353                 } else if (!MP_IS_STATUS_INTR ()) {
354                         g_warning (
355                                 "%s: Failed to write to pipe: %s",
356                                 G_STRFUNC, g_strerror (errno));
357                         break;
358                 }
359         }
360
361         prfd = msgport->prpipe[1];
362         while (prfd != NULL) {
363                 if (PR_Write (prfd, "E", 1) > 0) {
364                         msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE;
365                         break;
366                 } else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
367                         gchar *text = g_alloca (PR_GetErrorTextLength ());
368                         PR_GetErrorText (text);
369                         g_warning (
370                                 "%s: Failed to write to NSPR pipe: %s",
371                                 G_STRFUNC, text);
372                         break;
373                 }
374         }
375
376         g_async_queue_push_unlocked (msgport->queue, msg);
377         g_async_queue_unlock (msgport->queue);
378 }
379
380 /**
381  * camel_msgport_pop:
382  *
383  * Since: 2.24
384  **/
385 CamelMsg *
386 camel_msgport_pop (CamelMsgPort *msgport)
387 {
388         CamelMsg *msg;
389
390         g_return_val_if_fail (msgport != NULL, NULL);
391
392         g_async_queue_lock (msgport->queue);
393
394         msg = g_async_queue_pop_unlocked (msgport->queue);
395
396         g_assert (msg != NULL);
397
398         if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
399                 msgport_sync_with_pipe (msgport->pipe[0]);
400         if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
401                 msgport_sync_with_prpipe (msgport->prpipe[0]);
402
403         g_async_queue_unlock (msgport->queue);
404
405         return msg;
406 }
407
408 /**
409  * camel_msgport_try_pop:
410  *
411  * Since: 2.24
412  **/
413 CamelMsg *
414 camel_msgport_try_pop (CamelMsgPort *msgport)
415 {
416         CamelMsg *msg;
417
418         g_return_val_if_fail (msgport != NULL, NULL);
419
420         g_async_queue_lock (msgport->queue);
421
422         msg = g_async_queue_try_pop_unlocked (msgport->queue);
423
424         if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
425                 msgport_sync_with_pipe (msgport->pipe[0]);
426         if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
427                 msgport_sync_with_prpipe (msgport->prpipe[0]);
428
429         g_async_queue_unlock (msgport->queue);
430
431         return msg;
432 }
433
434 /**
435  * camel_msgport_timeout_pop:
436  * @msgport: a #CamelMsgPort
437  * @timeout: number of microseconds to wait
438  *
439  * Since: 3.8
440  **/
441 CamelMsg *
442 camel_msgport_timeout_pop (CamelMsgPort *msgport,
443                            guint64 timeout)
444 {
445         CamelMsg *msg;
446
447         g_return_val_if_fail (msgport != NULL, NULL);
448
449         g_async_queue_lock (msgport->queue);
450
451         msg = g_async_queue_timeout_pop_unlocked (msgport->queue, timeout);
452
453         if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
454                 msgport_sync_with_pipe (msgport->pipe[0]);
455         if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
456                 msgport_sync_with_prpipe (msgport->prpipe[0]);
457
458         g_async_queue_unlock (msgport->queue);
459
460         return msg;
461 }
462
463 /**
464  * camel_msgport_reply:
465  *
466  * Since: 2.24
467  **/
468 void
469 camel_msgport_reply (CamelMsg *msg)
470 {
471         g_return_if_fail (msg != NULL);
472
473         if (msg->reply_port)
474                 camel_msgport_push (msg->reply_port, msg);
475
476         /* else lost? */
477 }