introspect: Include whether a stream is corked in the info callback.
[platform/upstream/pulseaudio.git] / src / pulsecore / rtpoll.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <sys/types.h>
28 #include <stdio.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <errno.h>
32
33 #include <pulse/xmalloc.h>
34 #include <pulse/timeval.h>
35
36 #include <pulsecore/poll.h>
37 #include <pulsecore/core-error.h>
38 #include <pulsecore/core-rtclock.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/llist.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/winsock.h>
44 #include <pulsecore/ratelimit.h>
45 #include <pulse/rtclock.h>
46
47 #include "rtpoll.h"
48
49 /* #define DEBUG_TIMING */
50
51 struct pa_rtpoll {
52     struct pollfd *pollfd, *pollfd2;
53     unsigned n_pollfd_alloc, n_pollfd_used;
54
55     struct timeval next_elapse;
56     pa_bool_t timer_enabled:1;
57
58     pa_bool_t scan_for_dead:1;
59     pa_bool_t running:1;
60     pa_bool_t rebuild_needed:1;
61     pa_bool_t quit:1;
62     pa_bool_t timer_elapsed:1;
63
64 #ifdef DEBUG_TIMING
65     pa_usec_t timestamp;
66     pa_usec_t slept, awake;
67 #endif
68
69     PA_LLIST_HEAD(pa_rtpoll_item, items);
70 };
71
72 struct pa_rtpoll_item {
73     pa_rtpoll *rtpoll;
74     pa_bool_t dead;
75
76     pa_rtpoll_priority_t priority;
77
78     struct pollfd *pollfd;
79     unsigned n_pollfd;
80
81     int (*work_cb)(pa_rtpoll_item *i);
82     int (*before_cb)(pa_rtpoll_item *i);
83     void (*after_cb)(pa_rtpoll_item *i);
84     void *userdata;
85
86     PA_LLIST_FIELDS(pa_rtpoll_item);
87 };
88
89 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
90
91 pa_rtpoll *pa_rtpoll_new(void) {
92     pa_rtpoll *p;
93
94     p = pa_xnew0(pa_rtpoll, 1);
95
96     p->n_pollfd_alloc = 32;
97     p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
98     p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
99
100 #ifdef DEBUG_TIMING
101     p->timestamp = pa_rtclock_now();
102 #endif
103
104     return p;
105 }
106
107 static void rtpoll_rebuild(pa_rtpoll *p) {
108
109     struct pollfd *e, *t;
110     pa_rtpoll_item *i;
111     int ra = 0;
112
113     pa_assert(p);
114
115     p->rebuild_needed = FALSE;
116
117     if (p->n_pollfd_used > p->n_pollfd_alloc) {
118         /* Hmm, we have to allocate some more space */
119         p->n_pollfd_alloc = p->n_pollfd_used * 2;
120         p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
121         ra = 1;
122     }
123
124     e = p->pollfd2;
125
126     for (i = p->items; i; i = i->next) {
127
128         if (i->n_pollfd > 0)  {
129             size_t l = i->n_pollfd * sizeof(struct pollfd);
130
131             if (i->pollfd)
132                 memcpy(e, i->pollfd, l);
133             else
134                 memset(e, 0, l);
135
136             i->pollfd = e;
137         } else
138             i->pollfd = NULL;
139
140         e += i->n_pollfd;
141     }
142
143     pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
144     t = p->pollfd;
145     p->pollfd = p->pollfd2;
146     p->pollfd2 = t;
147
148     if (ra)
149         p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
150 }
151
152 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
153     pa_rtpoll *p;
154
155     pa_assert(i);
156
157     p = i->rtpoll;
158
159     PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
160
161     p->n_pollfd_used -= i->n_pollfd;
162
163     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
164         pa_xfree(i);
165
166     p->rebuild_needed = TRUE;
167 }
168
169 void pa_rtpoll_free(pa_rtpoll *p) {
170     pa_assert(p);
171
172     while (p->items)
173         rtpoll_item_destroy(p->items);
174
175     pa_xfree(p->pollfd);
176     pa_xfree(p->pollfd2);
177
178     pa_xfree(p);
179 }
180
181 static void reset_revents(pa_rtpoll_item *i) {
182     struct pollfd *f;
183     unsigned n;
184
185     pa_assert(i);
186
187     if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
188         return;
189
190     for (; n > 0; n--)
191         f[n-1].revents = 0;
192 }
193
194 static void reset_all_revents(pa_rtpoll *p) {
195     pa_rtpoll_item *i;
196
197     pa_assert(p);
198
199     for (i = p->items; i; i = i->next) {
200
201         if (i->dead)
202             continue;
203
204         reset_revents(i);
205     }
206 }
207
208 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait_op) {
209     pa_rtpoll_item *i;
210     int r = 0;
211     struct timeval timeout;
212
213     pa_assert(p);
214     pa_assert(!p->running);
215
216 #ifdef DEBUG_TIMING
217     pa_log("rtpoll_run");
218 #endif
219
220     p->running = TRUE;
221     p->timer_elapsed = FALSE;
222
223     /* First, let's do some work */
224     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
225         int k;
226
227         if (i->dead)
228             continue;
229
230         if (!i->work_cb)
231             continue;
232
233         if (p->quit) {
234 #ifdef DEBUG_TIMING
235             pa_log("rtpoll finish");
236 #endif
237             goto finish;
238         }
239
240         if ((k = i->work_cb(i)) != 0) {
241             if (k < 0)
242                 r = k;
243 #ifdef DEBUG_TIMING
244             pa_log("rtpoll finish");
245 #endif
246             goto finish;
247         }
248     }
249
250     /* Now let's prepare for entering the sleep */
251     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
252         int k = 0;
253
254         if (i->dead)
255             continue;
256
257         if (!i->before_cb)
258             continue;
259
260         if (p->quit || (k = i->before_cb(i)) != 0) {
261
262             /* Hmm, this one doesn't let us enter the poll, so rewind everything */
263
264             for (i = i->prev; i; i = i->prev) {
265
266                 if (i->dead)
267                     continue;
268
269                 if (!i->after_cb)
270                     continue;
271
272                 i->after_cb(i);
273             }
274
275             if (k < 0)
276                 r = k;
277 #ifdef DEBUG_TIMING
278             pa_log("rtpoll finish");
279 #endif
280             goto finish;
281         }
282     }
283
284     if (p->rebuild_needed)
285         rtpoll_rebuild(p);
286
287     pa_zero(timeout);
288
289     /* Calculate timeout */
290     if (wait_op && !p->quit && p->timer_enabled) {
291         struct timeval now;
292         pa_rtclock_get(&now);
293
294         if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
295             pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
296     }
297
298 #ifdef DEBUG_TIMING
299     {
300         pa_usec_t now = pa_rtclock_now();
301         p->awake = now - p->timestamp;
302         p->timestamp = now;
303         if (!wait_op || p->quit || p->timer_enabled)
304             pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)));
305         else
306             pa_log("poll timeout is ZERO");
307     }
308 #endif
309
310     /* OK, now let's sleep */
311 #ifdef HAVE_PPOLL
312     {
313         struct timespec ts;
314         ts.tv_sec = timeout.tv_sec;
315         ts.tv_nsec = timeout.tv_usec * 1000;
316         r = ppoll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? &ts : NULL, NULL);
317     }
318 #else
319     r = pa_poll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
320 #endif
321
322     p->timer_elapsed = r == 0;
323
324 #ifdef DEBUG_TIMING
325     {
326         pa_usec_t now = pa_rtclock_now();
327         p->slept = now - p->timestamp;
328         p->timestamp = now;
329
330         pa_log("Process time %llu ms; sleep time %llu ms",
331                (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
332                (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
333     }
334 #endif
335
336     if (r < 0) {
337         if (errno == EAGAIN || errno == EINTR)
338             r = 0;
339         else
340             pa_log_error("poll(): %s", pa_cstrerror(errno));
341
342         reset_all_revents(p);
343     }
344
345     /* Let's tell everyone that we left the sleep */
346     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
347
348         if (i->dead)
349             continue;
350
351         if (!i->after_cb)
352             continue;
353
354         i->after_cb(i);
355     }
356
357 finish:
358
359     p->running = FALSE;
360
361     if (p->scan_for_dead) {
362         pa_rtpoll_item *n;
363
364         p->scan_for_dead = FALSE;
365
366         for (i = p->items; i; i = n) {
367             n = i->next;
368
369             if (i->dead)
370                 rtpoll_item_destroy(i);
371         }
372     }
373
374     return r < 0 ? r : !p->quit;
375 }
376
377 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
378     pa_assert(p);
379
380     pa_timeval_store(&p->next_elapse, usec);
381     p->timer_enabled = TRUE;
382 }
383
384 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
385     pa_assert(p);
386
387     /* Scheduling a timeout for more than an hour is very very suspicious */
388     pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
389
390     pa_rtclock_get(&p->next_elapse);
391     pa_timeval_add(&p->next_elapse, usec);
392     p->timer_enabled = TRUE;
393 }
394
395 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
396     pa_assert(p);
397
398     memset(&p->next_elapse, 0, sizeof(p->next_elapse));
399     p->timer_enabled = FALSE;
400 }
401
402 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
403     pa_rtpoll_item *i, *j, *l = NULL;
404
405     pa_assert(p);
406
407     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
408         i = pa_xnew(pa_rtpoll_item, 1);
409
410     i->rtpoll = p;
411     i->dead = FALSE;
412     i->n_pollfd = n_fds;
413     i->pollfd = NULL;
414     i->priority = prio;
415
416     i->userdata = NULL;
417     i->before_cb = NULL;
418     i->after_cb = NULL;
419     i->work_cb = NULL;
420
421     for (j = p->items; j; j = j->next) {
422         if (prio <= j->priority)
423             break;
424
425         l = j;
426     }
427
428     PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
429
430     if (n_fds > 0) {
431         p->rebuild_needed = 1;
432         p->n_pollfd_used += n_fds;
433     }
434
435     return i;
436 }
437
438 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
439     pa_assert(i);
440
441     if (i->rtpoll->running) {
442         i->dead = TRUE;
443         i->rtpoll->scan_for_dead = TRUE;
444         return;
445     }
446
447     rtpoll_item_destroy(i);
448 }
449
450 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
451     pa_assert(i);
452
453     if (i->n_pollfd > 0)
454         if (i->rtpoll->rebuild_needed)
455             rtpoll_rebuild(i->rtpoll);
456
457     if (n_fds)
458         *n_fds = i->n_pollfd;
459
460     return i->pollfd;
461 }
462
463 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
464     pa_assert(i);
465     pa_assert(i->priority < PA_RTPOLL_NEVER);
466
467     i->before_cb = before_cb;
468 }
469
470 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
471     pa_assert(i);
472     pa_assert(i->priority < PA_RTPOLL_NEVER);
473
474     i->after_cb = after_cb;
475 }
476
477 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
478     pa_assert(i);
479     pa_assert(i->priority < PA_RTPOLL_NEVER);
480
481     i->work_cb = work_cb;
482 }
483
484 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
485     pa_assert(i);
486
487     i->userdata = userdata;
488 }
489
490 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
491     pa_assert(i);
492
493     return i->userdata;
494 }
495
496 static int fdsem_before(pa_rtpoll_item *i) {
497
498     if (pa_fdsem_before_poll(i->userdata) < 0)
499         return 1; /* 1 means immediate restart of the loop */
500
501     return 0;
502 }
503
504 static void fdsem_after(pa_rtpoll_item *i) {
505     pa_assert(i);
506
507     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
508     pa_fdsem_after_poll(i->userdata);
509 }
510
511 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
512     pa_rtpoll_item *i;
513     struct pollfd *pollfd;
514
515     pa_assert(p);
516     pa_assert(f);
517
518     i = pa_rtpoll_item_new(p, prio, 1);
519
520     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
521
522     pollfd->fd = pa_fdsem_get(f);
523     pollfd->events = POLLIN;
524
525     i->before_cb = fdsem_before;
526     i->after_cb = fdsem_after;
527     i->userdata = f;
528
529     return i;
530 }
531
532 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
533     pa_assert(i);
534
535     if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
536         return 1; /* 1 means immediate restart of the loop */
537
538     return 0;
539 }
540
541 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
542     pa_assert(i);
543
544     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
545     pa_asyncmsgq_read_after_poll(i->userdata);
546 }
547
548 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
549     pa_msgobject *object;
550     int code;
551     void *data;
552     pa_memchunk chunk;
553     int64_t offset;
554
555     pa_assert(i);
556
557     if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
558         int ret;
559
560         if (!object && code == PA_MESSAGE_SHUTDOWN) {
561             pa_asyncmsgq_done(i->userdata, 0);
562             pa_rtpoll_quit(i->rtpoll);
563             return 1;
564         }
565
566         ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
567         pa_asyncmsgq_done(i->userdata, ret);
568         return 1;
569     }
570
571     return 0;
572 }
573
574 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
575     pa_rtpoll_item *i;
576     struct pollfd *pollfd;
577
578     pa_assert(p);
579     pa_assert(q);
580
581     i = pa_rtpoll_item_new(p, prio, 1);
582
583     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
584     pollfd->fd = pa_asyncmsgq_read_fd(q);
585     pollfd->events = POLLIN;
586
587     i->before_cb = asyncmsgq_read_before;
588     i->after_cb = asyncmsgq_read_after;
589     i->work_cb = asyncmsgq_read_work;
590     i->userdata = q;
591
592     return i;
593 }
594
595 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
596     pa_assert(i);
597
598     pa_asyncmsgq_write_before_poll(i->userdata);
599     return 0;
600 }
601
602 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
603     pa_assert(i);
604
605     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
606     pa_asyncmsgq_write_after_poll(i->userdata);
607 }
608
609 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
610     pa_rtpoll_item *i;
611     struct pollfd *pollfd;
612
613     pa_assert(p);
614     pa_assert(q);
615
616     i = pa_rtpoll_item_new(p, prio, 1);
617
618     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
619     pollfd->fd = pa_asyncmsgq_write_fd(q);
620     pollfd->events = POLLIN;
621
622     i->before_cb = asyncmsgq_write_before;
623     i->after_cb = asyncmsgq_write_after;
624     i->work_cb = NULL;
625     i->userdata = q;
626
627     return i;
628 }
629
630 void pa_rtpoll_quit(pa_rtpoll *p) {
631     pa_assert(p);
632
633     p->quit = TRUE;
634 }
635
636 pa_bool_t pa_rtpoll_timer_elapsed(pa_rtpoll *p) {
637     pa_assert(p);
638
639     return p->timer_elapsed;
640 }