Tizen 2.1 base
[platform/upstream/gcd.git] / kqueue-1.0.4 / src / common / kqueue.c
1 /*
2  * Copyright (c) 2009 Mark Heily <mark@heily.com>
3  *
4  * Permission to use, copy, modify, and distribute this software for any
5  * purpose with or without fee is hereby granted, provided that the above
6  * copyright notice and this permission notice appear in all copies.
7  *
8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15  */
16
17 #include <errno.h>
18 #include <fcntl.h>
19 #include <poll.h>
20 #include <pthread.h>
21 #include <signal.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <sys/queue.h>
25 #include <sys/socket.h>
26 #include <sys/types.h>
27 #include <string.h>
28 #include <unistd.h>
29
30 #include "sys/event.h"
31 #include "private.h"
32
33 #ifndef NDEBUG
34 int KQUEUE_DEBUG = 0;
35 #endif
36
37 static RB_HEAD(kqt, kqueue) kqtree       = RB_INITIALIZER(&kqtree);
38 static pthread_rwlock_t     kqtree_mtx   = PTHREAD_RWLOCK_INITIALIZER;
39
40 static int
41 kqueue_cmp(struct kqueue *a, struct kqueue *b)
42 {
43     return memcmp(&a->kq_sockfd[1], &b->kq_sockfd[1], sizeof(int)); 
44 }
45
46 RB_GENERATE(kqt, kqueue, entries, kqueue_cmp)
47
48 /* Must hold the kqtree_mtx when calling this */
49 static void
50 kqueue_free(struct kqueue *kq)
51 {
52     RB_REMOVE(kqt, &kqtree, kq);
53     filter_unregister_all(kq);
54 #if defined(__sun__)
55     port_event_t *pe = (port_event_t *) pthread_getspecific(kq->kq_port_event);
56
57     if (kq->kq_port > 0) 
58         close(kq->kq_port);
59     free(pe);
60 #endif
61     free(kq);
62 }
63
64 static int
65 kqueue_gc(void)
66 {
67     int rv;
68     struct kqueue *n1, *n2;
69
70     /* Free any kqueue descriptor that is no longer needed */
71     /* Sadly O(N), however needed in the case that a descriptor is
72        closed and kevent(2) will never again be called on it. */
73     for (n1 = RB_MIN(kqt, &kqtree); n1 != NULL; n1 = n2) {
74         n2 = RB_NEXT(kqt, &kqtree, n1);
75
76         if (n1->kq_ref == 0) {
77             kqueue_free(n1);
78         } else {
79             rv = kqueue_validate(n1);
80             if (rv == 0) 
81                 kqueue_free(n1);
82             else if (rv < 0) 
83                 return (-1);
84         }
85     }
86
87     return (0);
88 }
89
90
91 int
92 kqueue_validate(struct kqueue *kq)
93 {
94     int rv;
95     char buf[1];
96     struct pollfd pfd;
97
98     pfd.fd = kq->kq_sockfd[0];
99     pfd.events = POLLIN | POLLHUP;
100     pfd.revents = 0;
101
102     rv = poll(&pfd, 1, 0);
103     if (rv == 0)
104         return (1);
105     if (rv < 0) {
106         dbg_perror("poll(2)");
107         return (-1);
108     }
109     if (rv > 0) {
110         /* NOTE: If the caller accidentally writes to the kqfd, it will
111                  be considered invalid. */
112         rv = recv(kq->kq_sockfd[0], buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT);
113         if (rv == 0) 
114             return (0);
115         else
116             return (-1);
117     }
118
119     return (0);
120 }
121
122 void
123 kqueue_put(struct kqueue *kq)
124 {
125     atomic_dec(&kq->kq_ref);
126 }
127
128 struct kqueue *
129 kqueue_get(int kq)
130 {
131     struct kqueue query;
132     struct kqueue *ent = NULL;
133
134     query.kq_sockfd[1] = kq;
135     pthread_rwlock_rdlock(&kqtree_mtx);
136     ent = RB_FIND(kqt, &kqtree, &query);
137     pthread_rwlock_unlock(&kqtree_mtx);
138
139     /* Check for invalid kqueue objects still in the tree */
140     if (ent != NULL && (ent->kq_sockfd[0] < 0 || ent->kq_ref == 0))
141         ent = NULL;
142     else
143         atomic_inc(&ent->kq_ref);
144
145     return (ent);
146 }
147
148 /* Non-portable kqueue initalization code. */
149 static int
150 kqueue_sys_init(struct kqueue *kq)
151 {
152 #if defined(__sun__)
153     port_event_t *pe;
154
155     if ((kq->kq_port = port_create()) < 0) {
156         dbg_perror("port_create(2)");
157         return (-1);
158     }
159     if (pthread_key_create(&kq->kq_port_event, NULL) != 0)
160        abort();
161     if ((pe = calloc(1, sizeof(*pe))) == NULL) 
162        abort();
163     if (pthread_setspecific(kq->kq_port_event, pe) != 0)
164        abort();
165 #endif
166     return (0);
167 }
168
169 int __attribute__((visibility("default")))
170 kqueue(void)
171 {
172     struct kqueue *kq;
173     int tmp;
174
175     kq = calloc(1, sizeof(*kq));
176     if (kq == NULL)
177         return (-1);
178     kq->kq_ref = 1;
179     pthread_mutex_init(&kq->kq_mtx, NULL);
180
181 #ifdef NDEBUG
182     KQUEUE_DEBUG = 0;
183 #else
184     KQUEUE_DEBUG = (getenv("KQUEUE_DEBUG") == NULL) ? 0 : 1;
185 #endif
186
187     if (socketpair(AF_UNIX, SOCK_STREAM, 0, kq->kq_sockfd) < 0) 
188         goto errout_unlocked;
189
190     if (kqueue_sys_init(kq) < 0)
191         goto errout_unlocked;
192
193     pthread_rwlock_wrlock(&kqtree_mtx);
194     if (kqueue_gc() < 0)
195         goto errout;
196     /* TODO: move outside of the lock if it is safe */
197     if (filter_register_all(kq) < 0)
198         goto errout;
199     RB_INSERT(kqt, &kqtree, kq);
200     pthread_rwlock_unlock(&kqtree_mtx);
201
202     dbg_printf("created kqueue, fd=%d", kq->kq_sockfd[1]);
203     return (kq->kq_sockfd[1]);
204
205 errout:
206     pthread_rwlock_unlock(&kqtree_mtx);
207
208 errout_unlocked:
209     if (kq->kq_sockfd[0] != kq->kq_sockfd[1]) {
210         tmp = errno;
211         (void)close(kq->kq_sockfd[0]);
212         (void)close(kq->kq_sockfd[1]);
213         errno = tmp;
214     }
215 #if defined(__sun__)
216     if (kq->kq_port > 0) 
217         close(kq->kq_port);
218 #endif
219     free(kq);
220     return (-1);
221 }