Merge commit 'origin/master-tx'
[profile/ivi/pulseaudio-panda.git] / src / pulsecore / rtpoll.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <sys/types.h>
28 #include <stdio.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <errno.h>
32
33 #ifdef __linux__
34 #include <sys/utsname.h>
35 #endif
36
37 #ifdef HAVE_POLL_H
38 #include <poll.h>
39 #else
40 #include <pulsecore/poll.h>
41 #endif
42
43 #include <pulse/xmalloc.h>
44 #include <pulse/timeval.h>
45
46 #include <pulsecore/core-error.h>
47 #include <pulsecore/rtclock.h>
48 #include <pulsecore/macro.h>
49 #include <pulsecore/llist.h>
50 #include <pulsecore/rtsig.h>
51 #include <pulsecore/flist.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/winsock.h>
54 #include <pulsecore/ratelimit.h>
55
56 #include "rtpoll.h"
57
58 /* #define DEBUG_TIMING */
59
60 struct pa_rtpoll {
61     struct pollfd *pollfd, *pollfd2;
62     unsigned n_pollfd_alloc, n_pollfd_used;
63
64     struct timeval next_elapse;
65     pa_bool_t timer_enabled:1;
66
67     pa_bool_t scan_for_dead:1;
68     pa_bool_t running:1;
69     pa_bool_t installed:1;
70     pa_bool_t rebuild_needed:1;
71     pa_bool_t quit:1;
72
73 #ifdef HAVE_PPOLL
74     pa_bool_t timer_armed:1;
75 #ifdef __linux__
76     pa_bool_t dont_use_ppoll:1;
77 #endif
78     int rtsig;
79     sigset_t sigset_unblocked;
80     timer_t timer;
81 #endif
82
83 #ifdef DEBUG_TIMING
84     pa_usec_t timestamp;
85     pa_usec_t slept, awake;
86 #endif
87
88     PA_LLIST_HEAD(pa_rtpoll_item, items);
89 };
90
91 struct pa_rtpoll_item {
92     pa_rtpoll *rtpoll;
93     pa_bool_t dead;
94
95     pa_rtpoll_priority_t priority;
96
97     struct pollfd *pollfd;
98     unsigned n_pollfd;
99
100     int (*work_cb)(pa_rtpoll_item *i);
101     int (*before_cb)(pa_rtpoll_item *i);
102     void (*after_cb)(pa_rtpoll_item *i);
103     void *userdata;
104
105     PA_LLIST_FIELDS(pa_rtpoll_item);
106 };
107
108 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
109
110 static void signal_handler_noop(int s) { /* write(2, "signal\n", 7); */ }
111
112 pa_rtpoll *pa_rtpoll_new(void) {
113     pa_rtpoll *p;
114
115     p = pa_xnew(pa_rtpoll, 1);
116
117 #ifdef HAVE_PPOLL
118
119 #ifdef __linux__
120     /* ppoll is broken on Linux < 2.6.16 */
121     p->dont_use_ppoll = FALSE;
122
123     {
124         struct utsname u;
125         unsigned major, minor, micro;
126
127         pa_assert_se(uname(&u) == 0);
128
129         if (sscanf(u.release, "%u.%u.%u", &major, &minor, &micro) != 3 ||
130             (major < 2) ||
131             (major == 2 && minor < 6) ||
132             (major == 2 && minor == 6 && micro < 16))
133
134             p->dont_use_ppoll = TRUE;
135     }
136
137 #endif
138
139     p->rtsig = -1;
140     sigemptyset(&p->sigset_unblocked);
141     p->timer = (timer_t) -1;
142     p->timer_armed = FALSE;
143
144 #endif
145
146     p->n_pollfd_alloc = 32;
147     p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
148     p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
149     p->n_pollfd_used = 0;
150
151     memset(&p->next_elapse, 0, sizeof(p->next_elapse));
152     p->timer_enabled = FALSE;
153
154     p->running = FALSE;
155     p->installed = FALSE;
156     p->scan_for_dead = FALSE;
157     p->rebuild_needed = FALSE;
158     p->quit = FALSE;
159
160     PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items);
161
162 #ifdef DEBUG_TIMING
163     p->timestamp = pa_rtclock_usec();
164     p->slept = p->awake = 0;
165 #endif
166
167     return p;
168 }
169
170 void pa_rtpoll_install(pa_rtpoll *p) {
171     pa_assert(p);
172     pa_assert(!p->installed);
173
174     p->installed = TRUE;
175
176 #ifdef HAVE_PPOLL
177 # ifdef __linux__
178     if (p->dont_use_ppoll)
179         return;
180 # endif
181
182     if ((p->rtsig = pa_rtsig_get_for_thread()) < 0) {
183         pa_log_warn("Failed to reserve POSIX realtime signal.");
184         return;
185     }
186
187     pa_log_debug("Acquired POSIX realtime signal %s", pa_sig2str(p->rtsig));
188
189     {
190         sigset_t ss;
191         struct sigaction sa;
192
193         pa_assert_se(sigemptyset(&ss) == 0);
194         pa_assert_se(sigaddset(&ss, p->rtsig) == 0);
195         pa_assert_se(pthread_sigmask(SIG_BLOCK, &ss, &p->sigset_unblocked) == 0);
196         pa_assert_se(sigdelset(&p->sigset_unblocked, p->rtsig) == 0);
197
198         memset(&sa, 0, sizeof(sa));
199         sa.sa_handler = signal_handler_noop;
200         pa_assert_se(sigemptyset(&sa.sa_mask) == 0);
201
202         pa_assert_se(sigaction(p->rtsig, &sa, NULL) == 0);
203
204         /* We never reset the signal handler. Why should we? */
205     }
206
207 #endif
208 }
209
210 static void rtpoll_rebuild(pa_rtpoll *p) {
211
212     struct pollfd *e, *t;
213     pa_rtpoll_item *i;
214     int ra = 0;
215
216     pa_assert(p);
217
218     p->rebuild_needed = FALSE;
219
220     if (p->n_pollfd_used > p->n_pollfd_alloc) {
221         /* Hmm, we have to allocate some more space */
222         p->n_pollfd_alloc = p->n_pollfd_used * 2;
223         p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
224         ra = 1;
225     }
226
227     e = p->pollfd2;
228
229     for (i = p->items; i; i = i->next) {
230
231         if (i->n_pollfd > 0)  {
232             size_t l = i->n_pollfd * sizeof(struct pollfd);
233
234             if (i->pollfd)
235                 memcpy(e, i->pollfd, l);
236             else
237                 memset(e, 0, l);
238
239             i->pollfd = e;
240         } else
241             i->pollfd = NULL;
242
243         e += i->n_pollfd;
244     }
245
246     pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
247     t = p->pollfd;
248     p->pollfd = p->pollfd2;
249     p->pollfd2 = t;
250
251     if (ra)
252         p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
253
254 }
255
256 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
257     pa_rtpoll *p;
258
259     pa_assert(i);
260
261     p = i->rtpoll;
262
263     PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
264
265     p->n_pollfd_used -= i->n_pollfd;
266
267     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
268         pa_xfree(i);
269
270     p->rebuild_needed = TRUE;
271 }
272
273 void pa_rtpoll_free(pa_rtpoll *p) {
274     pa_assert(p);
275
276     while (p->items)
277         rtpoll_item_destroy(p->items);
278
279     pa_xfree(p->pollfd);
280     pa_xfree(p->pollfd2);
281
282 #ifdef HAVE_PPOLL
283     if (p->timer != (timer_t) -1)
284         timer_delete(p->timer);
285 #endif
286
287     pa_xfree(p);
288 }
289
290 static void reset_revents(pa_rtpoll_item *i) {
291     struct pollfd *f;
292     unsigned n;
293
294     pa_assert(i);
295
296     if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
297         return;
298
299     for (; n > 0; n--)
300         f[n-1].revents = 0;
301 }
302
303 static void reset_all_revents(pa_rtpoll *p) {
304     pa_rtpoll_item *i;
305
306     pa_assert(p);
307
308     for (i = p->items; i; i = i->next) {
309
310         if (i->dead)
311             continue;
312
313         reset_revents(i);
314     }
315 }
316
317 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait) {
318     pa_rtpoll_item *i;
319     int r = 0;
320     struct timeval timeout;
321
322     pa_assert(p);
323     pa_assert(!p->running);
324     pa_assert(p->installed);
325
326     p->running = TRUE;
327
328     /* First, let's do some work */
329     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
330         int k;
331
332         if (i->dead)
333             continue;
334
335         if (!i->work_cb)
336             continue;
337
338         if (p->quit)
339             goto finish;
340
341         if ((k = i->work_cb(i)) != 0) {
342             if (k < 0)
343                 r = k;
344
345             goto finish;
346         }
347     }
348
349     /* Now let's prepare for entering the sleep */
350     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
351         int k = 0;
352
353         if (i->dead)
354             continue;
355
356         if (!i->before_cb)
357             continue;
358
359         if (p->quit || (k = i->before_cb(i)) != 0) {
360
361             /* Hmm, this one doesn't let us enter the poll, so rewind everything */
362
363             for (i = i->prev; i; i = i->prev) {
364
365                 if (i->dead)
366                     continue;
367
368                 if (!i->after_cb)
369                     continue;
370
371                 i->after_cb(i);
372             }
373
374             if (k < 0)
375                 r = k;
376
377             goto finish;
378         }
379     }
380
381     if (p->rebuild_needed)
382         rtpoll_rebuild(p);
383
384     memset(&timeout, 0, sizeof(timeout));
385
386     /* Calculate timeout */
387     if (wait && !p->quit && p->timer_enabled) {
388         struct timeval now;
389         pa_rtclock_get(&now);
390
391         if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
392             pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
393     }
394
395 #ifdef DEBUG_TIMING
396     {
397         pa_usec_t now = pa_rtclock_usec();
398         p->awake = now - p->timestamp;
399         p->timestamp = now;
400     }
401 #endif
402
403     /* OK, now let's sleep */
404 #ifdef HAVE_PPOLL
405
406 #ifdef __linux__
407     if (!p->dont_use_ppoll)
408 #endif
409     {
410         struct timespec ts;
411         ts.tv_sec = timeout.tv_sec;
412         ts.tv_nsec = timeout.tv_usec * 1000;
413         r = ppoll(p->pollfd, p->n_pollfd_used, (!wait || p->quit || p->timer_enabled) ? &ts : NULL, p->rtsig < 0 ? NULL : &p->sigset_unblocked);
414     }
415 #ifdef __linux__
416     else
417 #endif
418
419 #endif
420         r = poll(p->pollfd, p->n_pollfd_used, (!wait || p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
421
422 #ifdef DEBUG_TIMING
423     {
424         pa_usec_t now = pa_rtclock_usec();
425         p->slept = now - p->timestamp;
426         p->timestamp = now;
427
428         pa_log("Process time %llu ms; sleep time %llu ms",
429                (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
430                (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
431     }
432 #endif
433
434     if (r < 0) {
435         if (errno == EAGAIN || errno == EINTR)
436             r = 0;
437         else
438             pa_log_error("poll(): %s", pa_cstrerror(errno));
439
440         reset_all_revents(p);
441     }
442
443     /* Let's tell everyone that we left the sleep */
444     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
445
446         if (i->dead)
447             continue;
448
449         if (!i->after_cb)
450             continue;
451
452         i->after_cb(i);
453     }
454
455 finish:
456
457     p->running = FALSE;
458
459     if (p->scan_for_dead) {
460         pa_rtpoll_item *n;
461
462         p->scan_for_dead = FALSE;
463
464         for (i = p->items; i; i = n) {
465             n = i->next;
466
467             if (i->dead)
468                 rtpoll_item_destroy(i);
469         }
470     }
471
472     return r < 0 ? r : !p->quit;
473 }
474
475 static void update_timer(pa_rtpoll *p) {
476     pa_assert(p);
477
478 #ifdef HAVE_PPOLL
479
480 #ifdef __linux__
481     if (p->dont_use_ppoll)
482         return;
483 #endif
484
485     if (p->timer == (timer_t) -1) {
486         struct sigevent se;
487
488         memset(&se, 0, sizeof(se));
489         se.sigev_notify = SIGEV_SIGNAL;
490         se.sigev_signo = p->rtsig;
491
492         if (timer_create(CLOCK_MONOTONIC, &se, &p->timer) < 0)
493             if (timer_create(CLOCK_REALTIME, &se, &p->timer) < 0) {
494                 pa_log_warn("Failed to allocate POSIX timer: %s", pa_cstrerror(errno));
495                 p->timer = (timer_t) -1;
496             }
497     }
498
499     if (p->timer != (timer_t) -1) {
500         struct itimerspec its;
501         struct timespec ts = { .tv_sec = 0, .tv_nsec = 0 };
502         sigset_t ss;
503
504         if (p->timer_armed) {
505             /* First disarm timer */
506             memset(&its, 0, sizeof(its));
507             pa_assert_se(timer_settime(p->timer, TIMER_ABSTIME, &its, NULL) == 0);
508
509             /* Remove a signal that might be waiting in the signal q */
510             pa_assert_se(sigemptyset(&ss) == 0);
511             pa_assert_se(sigaddset(&ss, p->rtsig) == 0);
512             sigtimedwait(&ss, NULL, &ts);
513         }
514
515         /* And install the new timer */
516         if (p->timer_enabled) {
517             memset(&its, 0, sizeof(its));
518
519             its.it_value.tv_sec = p->next_elapse.tv_sec;
520             its.it_value.tv_nsec = p->next_elapse.tv_usec*1000;
521
522             /* Make sure that 0,0 is not understood as
523              * "disarming" */
524             if (its.it_value.tv_sec == 0 && its.it_value.tv_nsec == 0)
525                 its.it_value.tv_nsec = 1;
526             pa_assert_se(timer_settime(p->timer, TIMER_ABSTIME, &its, NULL) == 0);
527         }
528
529         p->timer_armed = p->timer_enabled;
530     }
531
532 #endif
533 }
534
535 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
536     pa_assert(p);
537
538     pa_timeval_store(&p->next_elapse, usec);
539     p->timer_enabled = TRUE;
540
541     update_timer(p);
542 }
543
544 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
545     pa_assert(p);
546
547     /* Scheduling a timeout for more than an hour is very very suspicious */
548     pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
549
550     pa_rtclock_get(&p->next_elapse);
551     pa_timeval_add(&p->next_elapse, usec);
552     p->timer_enabled = TRUE;
553
554     update_timer(p);
555 }
556
557 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
558     pa_assert(p);
559
560     memset(&p->next_elapse, 0, sizeof(p->next_elapse));
561     p->timer_enabled = FALSE;
562
563     update_timer(p);
564 }
565
566 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
567     pa_rtpoll_item *i, *j, *l = NULL;
568
569     pa_assert(p);
570
571     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
572         i = pa_xnew(pa_rtpoll_item, 1);
573
574     i->rtpoll = p;
575     i->dead = FALSE;
576     i->n_pollfd = n_fds;
577     i->pollfd = NULL;
578     i->priority = prio;
579
580     i->userdata = NULL;
581     i->before_cb = NULL;
582     i->after_cb = NULL;
583     i->work_cb = NULL;
584
585     for (j = p->items; j; j = j->next) {
586         if (prio <= j->priority)
587             break;
588
589         l = j;
590     }
591
592     PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
593
594     if (n_fds > 0) {
595         p->rebuild_needed = 1;
596         p->n_pollfd_used += n_fds;
597     }
598
599     return i;
600 }
601
602 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
603     pa_assert(i);
604
605     if (i->rtpoll->running) {
606         i->dead = TRUE;
607         i->rtpoll->scan_for_dead = TRUE;
608         return;
609     }
610
611     rtpoll_item_destroy(i);
612 }
613
614 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
615     pa_assert(i);
616
617     if (i->n_pollfd > 0)
618         if (i->rtpoll->rebuild_needed)
619             rtpoll_rebuild(i->rtpoll);
620
621     if (n_fds)
622         *n_fds = i->n_pollfd;
623
624     return i->pollfd;
625 }
626
627 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
628     pa_assert(i);
629     pa_assert(i->priority < PA_RTPOLL_NEVER);
630
631     i->before_cb = before_cb;
632 }
633
634 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
635     pa_assert(i);
636     pa_assert(i->priority < PA_RTPOLL_NEVER);
637
638     i->after_cb = after_cb;
639 }
640
641 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
642     pa_assert(i);
643     pa_assert(i->priority < PA_RTPOLL_NEVER);
644
645     i->work_cb = work_cb;
646 }
647
648 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
649     pa_assert(i);
650
651     i->userdata = userdata;
652 }
653
654 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
655     pa_assert(i);
656
657     return i->userdata;
658 }
659
660 static int fdsem_before(pa_rtpoll_item *i) {
661
662     if (pa_fdsem_before_poll(i->userdata) < 0)
663         return 1; /* 1 means immediate restart of the loop */
664
665     return 0;
666 }
667
668 static void fdsem_after(pa_rtpoll_item *i) {
669     pa_assert(i);
670
671     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
672     pa_fdsem_after_poll(i->userdata);
673 }
674
675 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
676     pa_rtpoll_item *i;
677     struct pollfd *pollfd;
678
679     pa_assert(p);
680     pa_assert(f);
681
682     i = pa_rtpoll_item_new(p, prio, 1);
683
684     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
685
686     pollfd->fd = pa_fdsem_get(f);
687     pollfd->events = POLLIN;
688
689     i->before_cb = fdsem_before;
690     i->after_cb = fdsem_after;
691     i->userdata = f;
692
693     return i;
694 }
695
696 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
697     pa_assert(i);
698
699     if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
700         return 1; /* 1 means immediate restart of the loop */
701
702     return 0;
703 }
704
705 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
706     pa_assert(i);
707
708     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
709     pa_asyncmsgq_read_after_poll(i->userdata);
710 }
711
712 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
713     pa_msgobject *object;
714     int code;
715     void *data;
716     pa_memchunk chunk;
717     int64_t offset;
718
719     pa_assert(i);
720
721     if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
722         int ret;
723
724         if (!object && code == PA_MESSAGE_SHUTDOWN) {
725             pa_asyncmsgq_done(i->userdata, 0);
726             pa_rtpoll_quit(i->rtpoll);
727             return 1;
728         }
729
730         ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
731         pa_asyncmsgq_done(i->userdata, ret);
732         return 1;
733     }
734
735     return 0;
736 }
737
738 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
739     pa_rtpoll_item *i;
740     struct pollfd *pollfd;
741
742     pa_assert(p);
743     pa_assert(q);
744
745     i = pa_rtpoll_item_new(p, prio, 1);
746
747     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
748     pollfd->fd = pa_asyncmsgq_read_fd(q);
749     pollfd->events = POLLIN;
750
751     i->before_cb = asyncmsgq_read_before;
752     i->after_cb = asyncmsgq_read_after;
753     i->work_cb = asyncmsgq_read_work;
754     i->userdata = q;
755
756     return i;
757 }
758
759 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
760     pa_assert(i);
761
762     pa_asyncmsgq_write_before_poll(i->userdata);
763     return 0;
764 }
765
766 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
767     pa_assert(i);
768
769     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
770     pa_asyncmsgq_write_after_poll(i->userdata);
771 }
772
773 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
774     pa_rtpoll_item *i;
775     struct pollfd *pollfd;
776
777     pa_assert(p);
778     pa_assert(q);
779
780     i = pa_rtpoll_item_new(p, prio, 1);
781
782     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
783     pollfd->fd = pa_asyncmsgq_write_fd(q);
784     pollfd->events = POLLIN;
785
786     i->before_cb = asyncmsgq_write_before;
787     i->after_cb = asyncmsgq_write_after;
788     i->work_cb = NULL;
789     i->userdata = q;
790
791     return i;
792 }
793
794 void pa_rtpoll_quit(pa_rtpoll *p) {
795     pa_assert(p);
796
797     p->quit = TRUE;
798 }