#define _GNU_SOURCE
#include <poll.h>
#include <unistd.h>
+#include <assert.h>
#include <signal.h>
#include <pthread.h>
#include <sys/epoll.h>
close(ctx.sfd[0]);
}
+enum {
+ EPOLL60_EVENTS_NR = 10,
+};
+
+struct epoll60_ctx {
+ volatile int stopped;
+ int ready;
+ int waiters;
+ int epfd;
+ int evfd[EPOLL60_EVENTS_NR];
+};
+
+static void *epoll60_wait_thread(void *ctx_)
+{
+ struct epoll60_ctx *ctx = ctx_;
+ struct epoll_event e;
+ sigset_t sigmask;
+ uint64_t v;
+ int ret;
+
+ /* Block SIGUSR1 */
+ sigemptyset(&sigmask);
+ sigaddset(&sigmask, SIGUSR1);
+ sigprocmask(SIG_SETMASK, &sigmask, NULL);
+
+ /* Prepare empty mask for epoll_pwait() */
+ sigemptyset(&sigmask);
+
+ while (!ctx->stopped) {
+ /* Mark we are ready */
+ __atomic_fetch_add(&ctx->ready, 1, __ATOMIC_ACQUIRE);
+
+ /* Start when all are ready */
+ while (__atomic_load_n(&ctx->ready, __ATOMIC_ACQUIRE) &&
+ !ctx->stopped);
+
+ /* Account this waiter */
+ __atomic_fetch_add(&ctx->waiters, 1, __ATOMIC_ACQUIRE);
+
+ ret = epoll_pwait(ctx->epfd, &e, 1, 2000, &sigmask);
+ if (ret != 1) {
+ /* We expect only signal delivery on stop */
+ assert(ret < 0 && errno == EINTR && "Lost wakeup!\n");
+ assert(ctx->stopped);
+ break;
+ }
+
+ ret = read(e.data.fd, &v, sizeof(v));
+ /* Since we are on ET mode, thus each thread gets its own fd. */
+ assert(ret == sizeof(v));
+
+ __atomic_fetch_sub(&ctx->waiters, 1, __ATOMIC_RELEASE);
+ }
+
+ return NULL;
+}
+
+static inline unsigned long long msecs(void)
+{
+ struct timespec ts;
+ unsigned long long msecs;
+
+ clock_gettime(CLOCK_REALTIME, &ts);
+ msecs = ts.tv_sec * 1000ull;
+ msecs += ts.tv_nsec / 1000000ull;
+
+ return msecs;
+}
+
+static inline int count_waiters(struct epoll60_ctx *ctx)
+{
+ return __atomic_load_n(&ctx->waiters, __ATOMIC_ACQUIRE);
+}
+
+TEST(epoll60)
+{
+ struct epoll60_ctx ctx = { 0 };
+ pthread_t waiters[ARRAY_SIZE(ctx.evfd)];
+ struct epoll_event e;
+ int i, n, ret;
+
+ signal(SIGUSR1, signal_handler);
+
+ ctx.epfd = epoll_create1(0);
+ ASSERT_GE(ctx.epfd, 0);
+
+ /* Create event fds */
+ for (i = 0; i < ARRAY_SIZE(ctx.evfd); i++) {
+ ctx.evfd[i] = eventfd(0, EFD_NONBLOCK);
+ ASSERT_GE(ctx.evfd[i], 0);
+
+ e.events = EPOLLIN | EPOLLET;
+ e.data.fd = ctx.evfd[i];
+ ASSERT_EQ(epoll_ctl(ctx.epfd, EPOLL_CTL_ADD, ctx.evfd[i], &e), 0);
+ }
+
+ /* Create waiter threads */
+ for (i = 0; i < ARRAY_SIZE(waiters); i++)
+ ASSERT_EQ(pthread_create(&waiters[i], NULL,
+ epoll60_wait_thread, &ctx), 0);
+
+ for (i = 0; i < 300; i++) {
+ uint64_t v = 1, ms;
+
+ /* Wait for all to be ready */
+ while (__atomic_load_n(&ctx.ready, __ATOMIC_ACQUIRE) !=
+ ARRAY_SIZE(ctx.evfd))
+ ;
+
+ /* Steady, go */
+ __atomic_fetch_sub(&ctx.ready, ARRAY_SIZE(ctx.evfd),
+ __ATOMIC_ACQUIRE);
+
+ /* Wait all have gone to kernel */
+ while (count_waiters(&ctx) != ARRAY_SIZE(ctx.evfd))
+ ;
+
+ /* 1ms should be enough to schedule away */
+ usleep(1000);
+
+ /* Quickly signal all handles at once */
+ for (n = 0; n < ARRAY_SIZE(ctx.evfd); n++) {
+ ret = write(ctx.evfd[n], &v, sizeof(v));
+ ASSERT_EQ(ret, sizeof(v));
+ }
+
+ /* Busy loop for 1s and wait for all waiters to wake up */
+ ms = msecs();
+ while (count_waiters(&ctx) && msecs() < ms + 1000)
+ ;
+
+ ASSERT_EQ(count_waiters(&ctx), 0);
+ }
+ ctx.stopped = 1;
+ /* Stop waiters */
+ for (i = 0; i < ARRAY_SIZE(waiters); i++)
+ ret = pthread_kill(waiters[i], SIGUSR1);
+ for (i = 0; i < ARRAY_SIZE(waiters); i++)
+ pthread_join(waiters[i], NULL);
+
+ for (i = 0; i < ARRAY_SIZE(waiters); i++)
+ close(ctx.evfd[i]);
+ close(ctx.epfd);
+}
+
TEST_HARNESS_MAIN