2 * Copyright (c) 2009 Mark Heily <mark@heily.com>
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
24 #include <sys/queue.h>
25 #include <sys/socket.h>
26 #include <sys/types.h>
30 #include "sys/event.h"
37 static RB_HEAD(kqt, kqueue) kqtree = RB_INITIALIZER(&kqtree);
38 static pthread_rwlock_t kqtree_mtx = PTHREAD_RWLOCK_INITIALIZER;
41 kqueue_cmp(struct kqueue *a, struct kqueue *b)
43 return memcmp(&a->kq_sockfd[1], &b->kq_sockfd[1], sizeof(int));
46 RB_GENERATE(kqt, kqueue, entries, kqueue_cmp)
48 /* Must hold the kqtree_mtx when calling this */
50 kqueue_free(struct kqueue *kq)
52 RB_REMOVE(kqt, &kqtree, kq);
53 filter_unregister_all(kq);
55 port_event_t *pe = (port_event_t *) pthread_getspecific(kq->kq_port_event);
68 struct kqueue *n1, *n2;
70 /* Free any kqueue descriptor that is no longer needed */
71 /* Sadly O(N), however needed in the case that a descriptor is
72 closed and kevent(2) will never again be called on it. */
73 for (n1 = RB_MIN(kqt, &kqtree); n1 != NULL; n1 = n2) {
74 n2 = RB_NEXT(kqt, &kqtree, n1);
76 if (n1->kq_ref == 0) {
79 rv = kqueue_validate(n1);
92 kqueue_validate(struct kqueue *kq)
98 pfd.fd = kq->kq_sockfd[0];
99 pfd.events = POLLIN | POLLHUP;
102 rv = poll(&pfd, 1, 0);
106 dbg_perror("poll(2)");
110 /* NOTE: If the caller accidentally writes to the kqfd, it will
111 be considered invalid. */
112 rv = recv(kq->kq_sockfd[0], buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT);
123 kqueue_put(struct kqueue *kq)
125 atomic_dec(&kq->kq_ref);
132 struct kqueue *ent = NULL;
134 query.kq_sockfd[1] = kq;
135 pthread_rwlock_rdlock(&kqtree_mtx);
136 ent = RB_FIND(kqt, &kqtree, &query);
137 pthread_rwlock_unlock(&kqtree_mtx);
139 /* Check for invalid kqueue objects still in the tree */
140 if (ent != NULL && (ent->kq_sockfd[0] < 0 || ent->kq_ref == 0))
143 atomic_inc(&ent->kq_ref);
148 /* Non-portable kqueue initalization code. */
150 kqueue_sys_init(struct kqueue *kq)
155 if ((kq->kq_port = port_create()) < 0) {
156 dbg_perror("port_create(2)");
159 if (pthread_key_create(&kq->kq_port_event, NULL) != 0)
161 if ((pe = calloc(1, sizeof(*pe))) == NULL)
163 if (pthread_setspecific(kq->kq_port_event, pe) != 0)
169 int __attribute__((visibility("default")))
175 kq = calloc(1, sizeof(*kq));
179 pthread_mutex_init(&kq->kq_mtx, NULL);
184 KQUEUE_DEBUG = (getenv("KQUEUE_DEBUG") == NULL) ? 0 : 1;
187 if (socketpair(AF_UNIX, SOCK_STREAM, 0, kq->kq_sockfd) < 0)
188 goto errout_unlocked;
190 if (kqueue_sys_init(kq) < 0)
191 goto errout_unlocked;
193 pthread_rwlock_wrlock(&kqtree_mtx);
196 /* TODO: move outside of the lock if it is safe */
197 if (filter_register_all(kq) < 0)
199 RB_INSERT(kqt, &kqtree, kq);
200 pthread_rwlock_unlock(&kqtree_mtx);
202 dbg_printf("created kqueue, fd=%d", kq->kq_sockfd[1]);
203 return (kq->kq_sockfd[1]);
206 pthread_rwlock_unlock(&kqtree_mtx);
209 if (kq->kq_sockfd[0] != kq->kq_sockfd[1]) {
211 (void)close(kq->kq_sockfd[0]);
212 (void)close(kq->kq_sockfd[1]);