Merge commit 'origin/master-tx'
[platform/upstream/pulseaudio.git] / src / pulsecore / rtpoll.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <sys/types.h>
28 #include <stdio.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <errno.h>
32
33 #ifdef HAVE_POLL_H
34 #include <poll.h>
35 #else
36 #include <pulsecore/poll.h>
37 #endif
38
39 #include <pulse/xmalloc.h>
40 #include <pulse/timeval.h>
41
42 #include <pulsecore/core-error.h>
43 #include <pulsecore/core-rtclock.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/llist.h>
46 #include <pulsecore/flist.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/winsock.h>
49 #include <pulsecore/ratelimit.h>
50
51 #include "rtpoll.h"
52
53 /* #define DEBUG_TIMING */
54
55 struct pa_rtpoll {
56     struct pollfd *pollfd, *pollfd2;
57     unsigned n_pollfd_alloc, n_pollfd_used;
58
59     struct timeval next_elapse;
60     pa_bool_t timer_enabled:1;
61
62     pa_bool_t scan_for_dead:1;
63     pa_bool_t running:1;
64     pa_bool_t rebuild_needed:1;
65     pa_bool_t quit:1;
66
67 #ifdef DEBUG_TIMING
68     pa_usec_t timestamp;
69     pa_usec_t slept, awake;
70 #endif
71
72     PA_LLIST_HEAD(pa_rtpoll_item, items);
73 };
74
75 struct pa_rtpoll_item {
76     pa_rtpoll *rtpoll;
77     pa_bool_t dead;
78
79     pa_rtpoll_priority_t priority;
80
81     struct pollfd *pollfd;
82     unsigned n_pollfd;
83
84     int (*work_cb)(pa_rtpoll_item *i);
85     int (*before_cb)(pa_rtpoll_item *i);
86     void (*after_cb)(pa_rtpoll_item *i);
87     void *userdata;
88
89     PA_LLIST_FIELDS(pa_rtpoll_item);
90 };
91
92 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
93
94 pa_rtpoll *pa_rtpoll_new(void) {
95     pa_rtpoll *p;
96
97     p = pa_xnew(pa_rtpoll, 1);
98
99     p->n_pollfd_alloc = 32;
100     p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
101     p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
102     p->n_pollfd_used = 0;
103
104     pa_zero(p->next_elapse);
105     p->timer_enabled = FALSE;
106
107     p->running = FALSE;
108     p->scan_for_dead = FALSE;
109     p->rebuild_needed = FALSE;
110     p->quit = FALSE;
111
112     PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items);
113
114 #ifdef DEBUG_TIMING
115     p->timestamp = pa_rtclock_now();
116     p->slept = p->awake = 0;
117 #endif
118
119     return p;
120 }
121
122 static void rtpoll_rebuild(pa_rtpoll *p) {
123
124     struct pollfd *e, *t;
125     pa_rtpoll_item *i;
126     int ra = 0;
127
128     pa_assert(p);
129
130     p->rebuild_needed = FALSE;
131
132     if (p->n_pollfd_used > p->n_pollfd_alloc) {
133         /* Hmm, we have to allocate some more space */
134         p->n_pollfd_alloc = p->n_pollfd_used * 2;
135         p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
136         ra = 1;
137     }
138
139     e = p->pollfd2;
140
141     for (i = p->items; i; i = i->next) {
142
143         if (i->n_pollfd > 0)  {
144             size_t l = i->n_pollfd * sizeof(struct pollfd);
145
146             if (i->pollfd)
147                 memcpy(e, i->pollfd, l);
148             else
149                 memset(e, 0, l);
150
151             i->pollfd = e;
152         } else
153             i->pollfd = NULL;
154
155         e += i->n_pollfd;
156     }
157
158     pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
159     t = p->pollfd;
160     p->pollfd = p->pollfd2;
161     p->pollfd2 = t;
162
163     if (ra)
164         p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
165 }
166
167 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
168     pa_rtpoll *p;
169
170     pa_assert(i);
171
172     p = i->rtpoll;
173
174     PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
175
176     p->n_pollfd_used -= i->n_pollfd;
177
178     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
179         pa_xfree(i);
180
181     p->rebuild_needed = TRUE;
182 }
183
184 void pa_rtpoll_free(pa_rtpoll *p) {
185     pa_assert(p);
186
187     while (p->items)
188         rtpoll_item_destroy(p->items);
189
190     pa_xfree(p->pollfd);
191     pa_xfree(p->pollfd2);
192
193     pa_xfree(p);
194 }
195
196 static void reset_revents(pa_rtpoll_item *i) {
197     struct pollfd *f;
198     unsigned n;
199
200     pa_assert(i);
201
202     if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
203         return;
204
205     for (; n > 0; n--)
206         f[n-1].revents = 0;
207 }
208
209 static void reset_all_revents(pa_rtpoll *p) {
210     pa_rtpoll_item *i;
211
212     pa_assert(p);
213
214     for (i = p->items; i; i = i->next) {
215
216         if (i->dead)
217             continue;
218
219         reset_revents(i);
220     }
221 }
222
223 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait_op) {
224     pa_rtpoll_item *i;
225     int r = 0;
226     struct timeval timeout;
227
228     pa_assert(p);
229     pa_assert(!p->running);
230
231     p->running = TRUE;
232
233     /* First, let's do some work */
234     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
235         int k;
236
237         if (i->dead)
238             continue;
239
240         if (!i->work_cb)
241             continue;
242
243         if (p->quit)
244             goto finish;
245
246         if ((k = i->work_cb(i)) != 0) {
247             if (k < 0)
248                 r = k;
249
250             goto finish;
251         }
252     }
253
254     /* Now let's prepare for entering the sleep */
255     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
256         int k = 0;
257
258         if (i->dead)
259             continue;
260
261         if (!i->before_cb)
262             continue;
263
264         if (p->quit || (k = i->before_cb(i)) != 0) {
265
266             /* Hmm, this one doesn't let us enter the poll, so rewind everything */
267
268             for (i = i->prev; i; i = i->prev) {
269
270                 if (i->dead)
271                     continue;
272
273                 if (!i->after_cb)
274                     continue;
275
276                 i->after_cb(i);
277             }
278
279             if (k < 0)
280                 r = k;
281
282             goto finish;
283         }
284     }
285
286     if (p->rebuild_needed)
287         rtpoll_rebuild(p);
288
289     memset(&timeout, 0, sizeof(timeout));
290
291     /* Calculate timeout */
292     if (wait_op && !p->quit && p->timer_enabled) {
293         struct timeval now;
294         pa_rtclock_get(&now);
295
296         if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
297             pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
298     }
299
300 #ifdef DEBUG_TIMING
301     {
302         pa_usec_t now = pa_rtclock_now();
303         p->awake = now - p->timestamp;
304         p->timestamp = now;
305     }
306 #endif
307
308     /* OK, now let's sleep */
309 #ifdef HAVE_PPOLL
310     {
311         struct timespec ts;
312         ts.tv_sec = timeout.tv_sec;
313         ts.tv_nsec = timeout.tv_usec * 1000;
314         r = ppoll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? &ts : NULL, NULL);
315     }
316 #else
317         r = poll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
318 #endif
319
320 #ifdef DEBUG_TIMING
321     {
322         pa_usec_t now = pa_rtclock_now();
323         p->slept = now - p->timestamp;
324         p->timestamp = now;
325
326         pa_log("Process time %llu ms; sleep time %llu ms",
327                (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
328                (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
329     }
330 #endif
331
332     if (r < 0) {
333         if (errno == EAGAIN || errno == EINTR)
334             r = 0;
335         else
336             pa_log_error("poll(): %s", pa_cstrerror(errno));
337
338         reset_all_revents(p);
339     }
340
341     /* Let's tell everyone that we left the sleep */
342     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
343
344         if (i->dead)
345             continue;
346
347         if (!i->after_cb)
348             continue;
349
350         i->after_cb(i);
351     }
352
353 finish:
354
355     p->running = FALSE;
356
357     if (p->scan_for_dead) {
358         pa_rtpoll_item *n;
359
360         p->scan_for_dead = FALSE;
361
362         for (i = p->items; i; i = n) {
363             n = i->next;
364
365             if (i->dead)
366                 rtpoll_item_destroy(i);
367         }
368     }
369
370     return r < 0 ? r : !p->quit;
371 }
372
373 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
374     pa_assert(p);
375
376     pa_timeval_store(&p->next_elapse, usec);
377     p->timer_enabled = TRUE;
378 }
379
380 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
381     pa_assert(p);
382
383     /* Scheduling a timeout for more than an hour is very very suspicious */
384     pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
385
386     pa_rtclock_get(&p->next_elapse);
387     pa_timeval_add(&p->next_elapse, usec);
388     p->timer_enabled = TRUE;
389 }
390
391 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
392     pa_assert(p);
393
394     memset(&p->next_elapse, 0, sizeof(p->next_elapse));
395     p->timer_enabled = FALSE;
396 }
397
398 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
399     pa_rtpoll_item *i, *j, *l = NULL;
400
401     pa_assert(p);
402
403     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
404         i = pa_xnew(pa_rtpoll_item, 1);
405
406     i->rtpoll = p;
407     i->dead = FALSE;
408     i->n_pollfd = n_fds;
409     i->pollfd = NULL;
410     i->priority = prio;
411
412     i->userdata = NULL;
413     i->before_cb = NULL;
414     i->after_cb = NULL;
415     i->work_cb = NULL;
416
417     for (j = p->items; j; j = j->next) {
418         if (prio <= j->priority)
419             break;
420
421         l = j;
422     }
423
424     PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
425
426     if (n_fds > 0) {
427         p->rebuild_needed = 1;
428         p->n_pollfd_used += n_fds;
429     }
430
431     return i;
432 }
433
434 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
435     pa_assert(i);
436
437     if (i->rtpoll->running) {
438         i->dead = TRUE;
439         i->rtpoll->scan_for_dead = TRUE;
440         return;
441     }
442
443     rtpoll_item_destroy(i);
444 }
445
446 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
447     pa_assert(i);
448
449     if (i->n_pollfd > 0)
450         if (i->rtpoll->rebuild_needed)
451             rtpoll_rebuild(i->rtpoll);
452
453     if (n_fds)
454         *n_fds = i->n_pollfd;
455
456     return i->pollfd;
457 }
458
459 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
460     pa_assert(i);
461     pa_assert(i->priority < PA_RTPOLL_NEVER);
462
463     i->before_cb = before_cb;
464 }
465
466 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
467     pa_assert(i);
468     pa_assert(i->priority < PA_RTPOLL_NEVER);
469
470     i->after_cb = after_cb;
471 }
472
473 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
474     pa_assert(i);
475     pa_assert(i->priority < PA_RTPOLL_NEVER);
476
477     i->work_cb = work_cb;
478 }
479
480 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
481     pa_assert(i);
482
483     i->userdata = userdata;
484 }
485
486 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
487     pa_assert(i);
488
489     return i->userdata;
490 }
491
492 static int fdsem_before(pa_rtpoll_item *i) {
493
494     if (pa_fdsem_before_poll(i->userdata) < 0)
495         return 1; /* 1 means immediate restart of the loop */
496
497     return 0;
498 }
499
500 static void fdsem_after(pa_rtpoll_item *i) {
501     pa_assert(i);
502
503     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
504     pa_fdsem_after_poll(i->userdata);
505 }
506
507 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
508     pa_rtpoll_item *i;
509     struct pollfd *pollfd;
510
511     pa_assert(p);
512     pa_assert(f);
513
514     i = pa_rtpoll_item_new(p, prio, 1);
515
516     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
517
518     pollfd->fd = pa_fdsem_get(f);
519     pollfd->events = POLLIN;
520
521     i->before_cb = fdsem_before;
522     i->after_cb = fdsem_after;
523     i->userdata = f;
524
525     return i;
526 }
527
528 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
529     pa_assert(i);
530
531     if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
532         return 1; /* 1 means immediate restart of the loop */
533
534     return 0;
535 }
536
537 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
538     pa_assert(i);
539
540     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
541     pa_asyncmsgq_read_after_poll(i->userdata);
542 }
543
544 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
545     pa_msgobject *object;
546     int code;
547     void *data;
548     pa_memchunk chunk;
549     int64_t offset;
550
551     pa_assert(i);
552
553     if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
554         int ret;
555
556         if (!object && code == PA_MESSAGE_SHUTDOWN) {
557             pa_asyncmsgq_done(i->userdata, 0);
558             pa_rtpoll_quit(i->rtpoll);
559             return 1;
560         }
561
562         ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
563         pa_asyncmsgq_done(i->userdata, ret);
564         return 1;
565     }
566
567     return 0;
568 }
569
570 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
571     pa_rtpoll_item *i;
572     struct pollfd *pollfd;
573
574     pa_assert(p);
575     pa_assert(q);
576
577     i = pa_rtpoll_item_new(p, prio, 1);
578
579     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
580     pollfd->fd = pa_asyncmsgq_read_fd(q);
581     pollfd->events = POLLIN;
582
583     i->before_cb = asyncmsgq_read_before;
584     i->after_cb = asyncmsgq_read_after;
585     i->work_cb = asyncmsgq_read_work;
586     i->userdata = q;
587
588     return i;
589 }
590
591 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
592     pa_assert(i);
593
594     pa_asyncmsgq_write_before_poll(i->userdata);
595     return 0;
596 }
597
598 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
599     pa_assert(i);
600
601     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
602     pa_asyncmsgq_write_after_poll(i->userdata);
603 }
604
605 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
606     pa_rtpoll_item *i;
607     struct pollfd *pollfd;
608
609     pa_assert(p);
610     pa_assert(q);
611
612     i = pa_rtpoll_item_new(p, prio, 1);
613
614     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
615     pollfd->fd = pa_asyncmsgq_write_fd(q);
616     pollfd->events = POLLIN;
617
618     i->before_cb = asyncmsgq_write_before;
619     i->after_cb = asyncmsgq_write_after;
620     i->work_cb = NULL;
621     i->userdata = q;
622
623     return i;
624 }
625
626 void pa_rtpoll_quit(pa_rtpoll *p) {
627     pa_assert(p);
628
629     p->quit = TRUE;
630 }