Merge branch 'master' of git://0pointer.de/pulseaudio
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57     s->read_callback = NULL;
58     s->read_userdata = NULL;
59     s->write_callback = NULL;
60     s->write_userdata = NULL;
61     s->state_callback = NULL;
62     s->state_userdata = NULL;
63     s->overflow_callback = NULL;
64     s->overflow_userdata = NULL;
65     s->underflow_callback = NULL;
66     s->underflow_userdata = NULL;
67     s->latency_update_callback = NULL;
68     s->latency_update_userdata = NULL;
69     s->moved_callback = NULL;
70     s->moved_userdata = NULL;
71     s->suspended_callback = NULL;
72     s->suspended_userdata = NULL;
73     s->started_callback = NULL;
74     s->started_userdata = NULL;
75     s->event_callback = NULL;
76     s->event_userdata = NULL;
77     s->buffer_attr_callback = NULL;
78     s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82         pa_context *c,
83         const char *name,
84         const pa_sample_spec *ss,
85         const pa_channel_map *map,
86         pa_proplist *p) {
87
88     pa_stream *s;
89     int i;
90     pa_channel_map tmap;
91
92     pa_assert(c);
93     pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     if (!map)
104         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114
115     s->sample_spec = *ss;
116     s->channel_map = *map;
117
118     s->direct_on_input = PA_INVALID_INDEX;
119
120     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121     if (name)
122         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124     s->channel = 0;
125     s->channel_valid = FALSE;
126     s->syncid = c->csyncid++;
127     s->stream_index = PA_INVALID_INDEX;
128
129     s->requested_bytes = 0;
130     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132     /* We initialize der target length here, so that if the user
133      * passes no explicit buffering metrics the default is similar to
134      * what older PA versions provided. */
135
136     s->buffer_attr.maxlength = (uint32_t) -1;
137     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138     s->buffer_attr.minreq = (uint32_t) -1;
139     s->buffer_attr.prebuf = (uint32_t) -1;
140     s->buffer_attr.fragsize = (uint32_t) -1;
141
142     s->device_index = PA_INVALID_INDEX;
143     s->device_name = NULL;
144     s->suspended = FALSE;
145     s->corked = FALSE;
146
147     s->write_memblock = NULL;
148     s->write_data = NULL;
149
150     pa_memchunk_reset(&s->peek_memchunk);
151     s->peek_data = NULL;
152     s->record_memblockq = NULL;
153
154     memset(&s->timing_info, 0, sizeof(s->timing_info));
155     s->timing_info_valid = FALSE;
156
157     s->previous_time = 0;
158
159     s->read_index_not_before = 0;
160     s->write_index_not_before = 0;
161     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162         s->write_index_corrections[i].valid = 0;
163     s->current_write_index_correction = 0;
164
165     s->auto_timing_update_event = NULL;
166     s->auto_timing_update_requested = FALSE;
167     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
168
169     reset_callbacks(s);
170
171     s->smoother = NULL;
172
173     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174     PA_LLIST_PREPEND(pa_stream, c->streams, s);
175     pa_stream_ref(s);
176
177     return s;
178 }
179
180 static void stream_unlink(pa_stream *s) {
181     pa_operation *o, *n;
182     pa_assert(s);
183
184     if (!s->context)
185         return;
186
187     /* Detach from context */
188
189     /* Unref all operatio object that point to us */
190     for (o = s->context->operations; o; o = n) {
191         n = o->next;
192
193         if (o->stream == s)
194             pa_operation_cancel(o);
195     }
196
197     /* Drop all outstanding replies for this stream */
198     if (s->context->pdispatch)
199         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200
201     if (s->channel_valid) {
202         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203         s->channel = 0;
204         s->channel_valid = FALSE;
205     }
206
207     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208     pa_stream_unref(s);
209
210     s->context = NULL;
211
212     if (s->auto_timing_update_event) {
213         pa_assert(s->mainloop);
214         s->mainloop->time_free(s->auto_timing_update_event);
215     }
216
217     reset_callbacks(s);
218 }
219
220 static void stream_free(pa_stream *s) {
221     pa_assert(s);
222
223     stream_unlink(s);
224
225     if (s->write_memblock) {
226         pa_memblock_release(s->write_memblock);
227         pa_memblock_unref(s->write_data);
228     }
229
230     if (s->peek_memchunk.memblock) {
231         if (s->peek_data)
232             pa_memblock_release(s->peek_memchunk.memblock);
233         pa_memblock_unref(s->peek_memchunk.memblock);
234     }
235
236     if (s->record_memblockq)
237         pa_memblockq_free(s->record_memblockq);
238
239     if (s->proplist)
240         pa_proplist_free(s->proplist);
241
242     if (s->smoother)
243         pa_smoother_free(s->smoother);
244
245     pa_xfree(s->device_name);
246     pa_xfree(s);
247 }
248
249 void pa_stream_unref(pa_stream *s) {
250     pa_assert(s);
251     pa_assert(PA_REFCNT_VALUE(s) >= 1);
252
253     if (PA_REFCNT_DEC(s) <= 0)
254         stream_free(s);
255 }
256
257 pa_stream* pa_stream_ref(pa_stream *s) {
258     pa_assert(s);
259     pa_assert(PA_REFCNT_VALUE(s) >= 1);
260
261     PA_REFCNT_INC(s);
262     return s;
263 }
264
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->state;
270 }
271
272 pa_context* pa_stream_get_context(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     return s->context;
277 }
278
279 uint32_t pa_stream_get_index(pa_stream *s) {
280     pa_assert(s);
281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
282
283     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
285
286     return s->stream_index;
287 }
288
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290     pa_assert(s);
291     pa_assert(PA_REFCNT_VALUE(s) >= 1);
292
293     if (s->state == st)
294         return;
295
296     pa_stream_ref(s);
297
298     s->state = st;
299
300     if (s->state_callback)
301         s->state_callback(s, s->state_userdata);
302
303     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304         stream_unlink(s);
305
306     pa_stream_unref(s);
307 }
308
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310     pa_assert(s);
311     pa_assert(PA_REFCNT_VALUE(s) >= 1);
312
313     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314         return;
315
316     if (s->state == PA_STREAM_READY &&
317         (force || !s->auto_timing_update_requested)) {
318         pa_operation *o;
319
320 /*         pa_log("Automatically requesting new timing data"); */
321
322         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323             pa_operation_unref(o);
324             s->auto_timing_update_requested = TRUE;
325         }
326     }
327
328     if (s->auto_timing_update_event) {
329         if (force)
330             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
331
332         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
333
334         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335     }
336 }
337
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339     pa_context *c = userdata;
340     pa_stream *s;
341     uint32_t channel;
342
343     pa_assert(pd);
344     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345     pa_assert(t);
346     pa_assert(c);
347     pa_assert(PA_REFCNT_VALUE(c) >= 1);
348
349     pa_context_ref(c);
350
351     if (pa_tagstruct_getu32(t, &channel) < 0 ||
352         !pa_tagstruct_eof(t)) {
353         pa_context_fail(c, PA_ERR_PROTOCOL);
354         goto finish;
355     }
356
357     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358         goto finish;
359
360     if (s->state != PA_STREAM_READY)
361         goto finish;
362
363     pa_context_set_error(c, PA_ERR_KILLED);
364     pa_stream_set_state(s, PA_STREAM_FAILED);
365
366 finish:
367     pa_context_unref(c);
368 }
369
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371     pa_usec_t x;
372
373     pa_assert(s);
374     pa_assert(!force_start || !force_stop);
375
376     if (!s->smoother)
377         return;
378
379     x = pa_rtclock_now();
380
381     if (s->timing_info_valid) {
382         if (aposteriori)
383             x -= s->timing_info.transport_usec;
384         else
385             x += s->timing_info.transport_usec;
386     }
387
388     if (s->suspended || s->corked || force_stop)
389         pa_smoother_pause(s->smoother, x);
390     else if (force_start || s->buffer_attr.prebuf == 0) {
391
392         if (!s->timing_info_valid &&
393             !aposteriori &&
394             !force_start &&
395             !force_stop &&
396             s->context->version >= 13) {
397
398             /* If the server supports STARTED events we take them as
399              * indications when audio really starts/stops playing, if
400              * we don't have any timing info yet -- instead of trying
401              * to be smart and guessing the server time. Otherwise the
402              * unknown transport delay add too much noise to our time
403              * calculations. */
404
405             return;
406         }
407
408         pa_smoother_resume(s->smoother, x, TRUE);
409     }
410
411     /* Please note that we have no idea if playback actually started
412      * if prebuf is non-zero! */
413 }
414
415 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
416     pa_context *c = userdata;
417     pa_stream *s;
418     uint32_t channel;
419     const char *dn;
420     pa_bool_t suspended;
421     uint32_t di;
422     pa_usec_t usec = 0;
423     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
424
425     pa_assert(pd);
426     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
427     pa_assert(t);
428     pa_assert(c);
429     pa_assert(PA_REFCNT_VALUE(c) >= 1);
430
431     pa_context_ref(c);
432
433     if (c->version < 12) {
434         pa_context_fail(c, PA_ERR_PROTOCOL);
435         goto finish;
436     }
437
438     if (pa_tagstruct_getu32(t, &channel) < 0 ||
439         pa_tagstruct_getu32(t, &di) < 0 ||
440         pa_tagstruct_gets(t, &dn) < 0 ||
441         pa_tagstruct_get_boolean(t, &suspended) < 0) {
442         pa_context_fail(c, PA_ERR_PROTOCOL);
443         goto finish;
444     }
445
446     if (c->version >= 13) {
447
448         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
449             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
450                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
451                 pa_tagstruct_get_usec(t, &usec) < 0) {
452                 pa_context_fail(c, PA_ERR_PROTOCOL);
453                 goto finish;
454             }
455         } else {
456             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
457                 pa_tagstruct_getu32(t, &tlength) < 0 ||
458                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
459                 pa_tagstruct_getu32(t, &minreq) < 0 ||
460                 pa_tagstruct_get_usec(t, &usec) < 0) {
461                 pa_context_fail(c, PA_ERR_PROTOCOL);
462                 goto finish;
463             }
464         }
465     }
466
467     if (!pa_tagstruct_eof(t)) {
468         pa_context_fail(c, PA_ERR_PROTOCOL);
469         goto finish;
470     }
471
472     if (!dn || di == PA_INVALID_INDEX) {
473         pa_context_fail(c, PA_ERR_PROTOCOL);
474         goto finish;
475     }
476
477     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
478         goto finish;
479
480     if (s->state != PA_STREAM_READY)
481         goto finish;
482
483     if (c->version >= 13) {
484         if (s->direction == PA_STREAM_RECORD)
485             s->timing_info.configured_source_usec = usec;
486         else
487             s->timing_info.configured_sink_usec = usec;
488
489         s->buffer_attr.maxlength = maxlength;
490         s->buffer_attr.fragsize = fragsize;
491         s->buffer_attr.tlength = tlength;
492         s->buffer_attr.prebuf = prebuf;
493         s->buffer_attr.minreq = minreq;
494     }
495
496     pa_xfree(s->device_name);
497     s->device_name = pa_xstrdup(dn);
498     s->device_index = di;
499
500     s->suspended = suspended;
501
502     check_smoother_status(s, TRUE, FALSE, FALSE);
503     request_auto_timing_update(s, TRUE);
504
505     if (s->moved_callback)
506         s->moved_callback(s, s->moved_userdata);
507
508 finish:
509     pa_context_unref(c);
510 }
511
512 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
513     pa_context *c = userdata;
514     pa_stream *s;
515     uint32_t channel;
516     pa_usec_t usec = 0;
517     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
518
519     pa_assert(pd);
520     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
521     pa_assert(t);
522     pa_assert(c);
523     pa_assert(PA_REFCNT_VALUE(c) >= 1);
524
525     pa_context_ref(c);
526
527     if (c->version < 15) {
528         pa_context_fail(c, PA_ERR_PROTOCOL);
529         goto finish;
530     }
531
532     if (pa_tagstruct_getu32(t, &channel) < 0) {
533         pa_context_fail(c, PA_ERR_PROTOCOL);
534         goto finish;
535     }
536
537     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
538         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
539             pa_tagstruct_getu32(t, &fragsize) < 0 ||
540             pa_tagstruct_get_usec(t, &usec) < 0) {
541             pa_context_fail(c, PA_ERR_PROTOCOL);
542             goto finish;
543         }
544     } else {
545         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
546             pa_tagstruct_getu32(t, &tlength) < 0 ||
547             pa_tagstruct_getu32(t, &prebuf) < 0 ||
548             pa_tagstruct_getu32(t, &minreq) < 0 ||
549             pa_tagstruct_get_usec(t, &usec) < 0) {
550             pa_context_fail(c, PA_ERR_PROTOCOL);
551             goto finish;
552         }
553     }
554
555     if (!pa_tagstruct_eof(t)) {
556         pa_context_fail(c, PA_ERR_PROTOCOL);
557         goto finish;
558     }
559
560     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
561         goto finish;
562
563     if (s->state != PA_STREAM_READY)
564         goto finish;
565
566     if (s->direction == PA_STREAM_RECORD)
567         s->timing_info.configured_source_usec = usec;
568     else
569         s->timing_info.configured_sink_usec = usec;
570
571     s->buffer_attr.maxlength = maxlength;
572     s->buffer_attr.fragsize = fragsize;
573     s->buffer_attr.tlength = tlength;
574     s->buffer_attr.prebuf = prebuf;
575     s->buffer_attr.minreq = minreq;
576
577     request_auto_timing_update(s, TRUE);
578
579     if (s->buffer_attr_callback)
580         s->buffer_attr_callback(s, s->buffer_attr_userdata);
581
582 finish:
583     pa_context_unref(c);
584 }
585
586 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
587     pa_context *c = userdata;
588     pa_stream *s;
589     uint32_t channel;
590     pa_bool_t suspended;
591
592     pa_assert(pd);
593     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
594     pa_assert(t);
595     pa_assert(c);
596     pa_assert(PA_REFCNT_VALUE(c) >= 1);
597
598     pa_context_ref(c);
599
600     if (c->version < 12) {
601         pa_context_fail(c, PA_ERR_PROTOCOL);
602         goto finish;
603     }
604
605     if (pa_tagstruct_getu32(t, &channel) < 0 ||
606         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
607         !pa_tagstruct_eof(t)) {
608         pa_context_fail(c, PA_ERR_PROTOCOL);
609         goto finish;
610     }
611
612     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
613         goto finish;
614
615     if (s->state != PA_STREAM_READY)
616         goto finish;
617
618     s->suspended = suspended;
619
620     check_smoother_status(s, TRUE, FALSE, FALSE);
621     request_auto_timing_update(s, TRUE);
622
623     if (s->suspended_callback)
624         s->suspended_callback(s, s->suspended_userdata);
625
626 finish:
627     pa_context_unref(c);
628 }
629
630 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
631     pa_context *c = userdata;
632     pa_stream *s;
633     uint32_t channel;
634
635     pa_assert(pd);
636     pa_assert(command == PA_COMMAND_STARTED);
637     pa_assert(t);
638     pa_assert(c);
639     pa_assert(PA_REFCNT_VALUE(c) >= 1);
640
641     pa_context_ref(c);
642
643     if (c->version < 13) {
644         pa_context_fail(c, PA_ERR_PROTOCOL);
645         goto finish;
646     }
647
648     if (pa_tagstruct_getu32(t, &channel) < 0 ||
649         !pa_tagstruct_eof(t)) {
650         pa_context_fail(c, PA_ERR_PROTOCOL);
651         goto finish;
652     }
653
654     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
655         goto finish;
656
657     if (s->state != PA_STREAM_READY)
658         goto finish;
659
660     check_smoother_status(s, TRUE, TRUE, FALSE);
661     request_auto_timing_update(s, TRUE);
662
663     if (s->started_callback)
664         s->started_callback(s, s->started_userdata);
665
666 finish:
667     pa_context_unref(c);
668 }
669
670 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
671     pa_context *c = userdata;
672     pa_stream *s;
673     uint32_t channel;
674     pa_proplist *pl = NULL;
675     const char *event;
676
677     pa_assert(pd);
678     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
679     pa_assert(t);
680     pa_assert(c);
681     pa_assert(PA_REFCNT_VALUE(c) >= 1);
682
683     pa_context_ref(c);
684
685     if (c->version < 15) {
686         pa_context_fail(c, PA_ERR_PROTOCOL);
687         goto finish;
688     }
689
690     pl = pa_proplist_new();
691
692     if (pa_tagstruct_getu32(t, &channel) < 0 ||
693         pa_tagstruct_gets(t, &event) < 0 ||
694         pa_tagstruct_get_proplist(t, pl) < 0 ||
695         !pa_tagstruct_eof(t) || !event) {
696         pa_context_fail(c, PA_ERR_PROTOCOL);
697         goto finish;
698     }
699
700     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
701         goto finish;
702
703     if (s->state != PA_STREAM_READY)
704         goto finish;
705
706     if (s->event_callback)
707         s->event_callback(s, event, pl, s->event_userdata);
708
709 finish:
710     pa_context_unref(c);
711
712     if (pl)
713         pa_proplist_free(pl);
714 }
715
716 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717     pa_stream *s;
718     pa_context *c = userdata;
719     uint32_t bytes, channel;
720
721     pa_assert(pd);
722     pa_assert(command == PA_COMMAND_REQUEST);
723     pa_assert(t);
724     pa_assert(c);
725     pa_assert(PA_REFCNT_VALUE(c) >= 1);
726
727     pa_context_ref(c);
728
729     if (pa_tagstruct_getu32(t, &channel) < 0 ||
730         pa_tagstruct_getu32(t, &bytes) < 0 ||
731         !pa_tagstruct_eof(t)) {
732         pa_context_fail(c, PA_ERR_PROTOCOL);
733         goto finish;
734     }
735
736     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
737         goto finish;
738
739     if (s->state != PA_STREAM_READY)
740         goto finish;
741
742     s->requested_bytes += bytes;
743
744     if (s->requested_bytes > 0 && s->write_callback)
745         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
746
747 finish:
748     pa_context_unref(c);
749 }
750
751 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
752     pa_stream *s;
753     pa_context *c = userdata;
754     uint32_t channel;
755
756     pa_assert(pd);
757     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
758     pa_assert(t);
759     pa_assert(c);
760     pa_assert(PA_REFCNT_VALUE(c) >= 1);
761
762     pa_context_ref(c);
763
764     if (pa_tagstruct_getu32(t, &channel) < 0 ||
765         !pa_tagstruct_eof(t)) {
766         pa_context_fail(c, PA_ERR_PROTOCOL);
767         goto finish;
768     }
769
770     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
771         goto finish;
772
773     if (s->state != PA_STREAM_READY)
774         goto finish;
775
776     if (s->buffer_attr.prebuf > 0)
777         check_smoother_status(s, TRUE, FALSE, TRUE);
778
779     request_auto_timing_update(s, TRUE);
780
781     if (command == PA_COMMAND_OVERFLOW) {
782         if (s->overflow_callback)
783             s->overflow_callback(s, s->overflow_userdata);
784     } else if (command == PA_COMMAND_UNDERFLOW) {
785         if (s->underflow_callback)
786             s->underflow_callback(s, s->underflow_userdata);
787     }
788
789  finish:
790     pa_context_unref(c);
791 }
792
793 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
794     pa_assert(s);
795     pa_assert(PA_REFCNT_VALUE(s) >= 1);
796
797 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
798
799     if (s->state != PA_STREAM_READY)
800         return;
801
802     if (w) {
803         s->write_index_not_before = s->context->ctag;
804
805         if (s->timing_info_valid)
806             s->timing_info.write_index_corrupt = TRUE;
807
808 /*         pa_log("write_index invalidated"); */
809     }
810
811     if (r) {
812         s->read_index_not_before = s->context->ctag;
813
814         if (s->timing_info_valid)
815             s->timing_info.read_index_corrupt = TRUE;
816
817 /*         pa_log("read_index invalidated"); */
818     }
819
820     request_auto_timing_update(s, TRUE);
821 }
822
823 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
824     pa_stream *s = userdata;
825
826     pa_assert(s);
827     pa_assert(PA_REFCNT_VALUE(s) >= 1);
828
829     pa_stream_ref(s);
830     request_auto_timing_update(s, FALSE);
831     pa_stream_unref(s);
832 }
833
834 static void create_stream_complete(pa_stream *s) {
835     pa_assert(s);
836     pa_assert(PA_REFCNT_VALUE(s) >= 1);
837     pa_assert(s->state == PA_STREAM_CREATING);
838
839     pa_stream_set_state(s, PA_STREAM_READY);
840
841     if (s->requested_bytes > 0 && s->write_callback)
842         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
843
844     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
845         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
846         pa_assert(!s->auto_timing_update_event);
847         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
848
849         request_auto_timing_update(s, TRUE);
850     }
851
852     check_smoother_status(s, TRUE, FALSE, FALSE);
853 }
854
855 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
856     pa_assert(s);
857     pa_assert(attr);
858     pa_assert(ss);
859
860     if (s->context->version >= 13)
861         return;
862
863     /* Version older than 0.9.10 didn't do server side buffer_attr
864      * selection, hence we have to fake it on the client side. */
865
866     /* We choose fairly conservative values here, to not confuse
867      * old clients with extremely large playback buffers */
868
869     if (attr->maxlength == (uint32_t) -1)
870         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
871
872     if (attr->tlength == (uint32_t) -1)
873         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
874
875     if (attr->minreq == (uint32_t) -1)
876         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
877
878     if (attr->prebuf == (uint32_t) -1)
879         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
880
881     if (attr->fragsize == (uint32_t) -1)
882         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
883 }
884
885 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
886     pa_stream *s = userdata;
887     uint32_t requested_bytes = 0;
888
889     pa_assert(pd);
890     pa_assert(s);
891     pa_assert(PA_REFCNT_VALUE(s) >= 1);
892     pa_assert(s->state == PA_STREAM_CREATING);
893
894     pa_stream_ref(s);
895
896     if (command != PA_COMMAND_REPLY) {
897         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
898             goto finish;
899
900         pa_stream_set_state(s, PA_STREAM_FAILED);
901         goto finish;
902     }
903
904     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
905         s->channel == PA_INVALID_INDEX ||
906         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
907         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
908         pa_context_fail(s->context, PA_ERR_PROTOCOL);
909         goto finish;
910     }
911
912     s->requested_bytes = (int64_t) requested_bytes;
913
914     if (s->context->version >= 9) {
915         if (s->direction == PA_STREAM_PLAYBACK) {
916             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
917                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
918                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
919                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
920                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
921                 goto finish;
922             }
923         } else if (s->direction == PA_STREAM_RECORD) {
924             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
925                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
926                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
927                 goto finish;
928             }
929         }
930     }
931
932     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
933         pa_sample_spec ss;
934         pa_channel_map cm;
935         const char *dn = NULL;
936         pa_bool_t suspended;
937
938         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
939             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
940             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
941             pa_tagstruct_gets(t, &dn) < 0 ||
942             pa_tagstruct_get_boolean(t, &suspended) < 0) {
943             pa_context_fail(s->context, PA_ERR_PROTOCOL);
944             goto finish;
945         }
946
947         if (!dn || s->device_index == PA_INVALID_INDEX ||
948             ss.channels != cm.channels ||
949             !pa_channel_map_valid(&cm) ||
950             !pa_sample_spec_valid(&ss) ||
951             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
952             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
953             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
954             pa_context_fail(s->context, PA_ERR_PROTOCOL);
955             goto finish;
956         }
957
958         pa_xfree(s->device_name);
959         s->device_name = pa_xstrdup(dn);
960         s->suspended = suspended;
961
962         s->channel_map = cm;
963         s->sample_spec = ss;
964     }
965
966     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
967         pa_usec_t usec;
968
969         if (pa_tagstruct_get_usec(t, &usec) < 0) {
970             pa_context_fail(s->context, PA_ERR_PROTOCOL);
971             goto finish;
972         }
973
974         if (s->direction == PA_STREAM_RECORD)
975             s->timing_info.configured_source_usec = usec;
976         else
977             s->timing_info.configured_sink_usec = usec;
978     }
979
980     if (!pa_tagstruct_eof(t)) {
981         pa_context_fail(s->context, PA_ERR_PROTOCOL);
982         goto finish;
983     }
984
985     if (s->direction == PA_STREAM_RECORD) {
986         pa_assert(!s->record_memblockq);
987
988         s->record_memblockq = pa_memblockq_new(
989                 0,
990                 s->buffer_attr.maxlength,
991                 0,
992                 pa_frame_size(&s->sample_spec),
993                 1,
994                 0,
995                 0,
996                 NULL);
997     }
998
999     s->channel_valid = TRUE;
1000     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
1001
1002     create_stream_complete(s);
1003
1004 finish:
1005     pa_stream_unref(s);
1006 }
1007
1008 static int create_stream(
1009         pa_stream_direction_t direction,
1010         pa_stream *s,
1011         const char *dev,
1012         const pa_buffer_attr *attr,
1013         pa_stream_flags_t flags,
1014         const pa_cvolume *volume,
1015         pa_stream *sync_stream) {
1016
1017     pa_tagstruct *t;
1018     uint32_t tag;
1019     pa_bool_t volume_set = FALSE;
1020
1021     pa_assert(s);
1022     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1023     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1024
1025     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1026     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1027     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1028     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1029                                               PA_STREAM_INTERPOLATE_TIMING|
1030                                               PA_STREAM_NOT_MONOTONIC|
1031                                               PA_STREAM_AUTO_TIMING_UPDATE|
1032                                               PA_STREAM_NO_REMAP_CHANNELS|
1033                                               PA_STREAM_NO_REMIX_CHANNELS|
1034                                               PA_STREAM_FIX_FORMAT|
1035                                               PA_STREAM_FIX_RATE|
1036                                               PA_STREAM_FIX_CHANNELS|
1037                                               PA_STREAM_DONT_MOVE|
1038                                               PA_STREAM_VARIABLE_RATE|
1039                                               PA_STREAM_PEAK_DETECT|
1040                                               PA_STREAM_START_MUTED|
1041                                               PA_STREAM_ADJUST_LATENCY|
1042                                               PA_STREAM_EARLY_REQUESTS|
1043                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1044                                               PA_STREAM_START_UNMUTED|
1045                                               PA_STREAM_FAIL_ON_SUSPEND|
1046                                               PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1047
1048     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1049     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1050     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1051     /* Althought some of the other flags are not supported on older
1052      * version, we don't check for them here, because it doesn't hurt
1053      * when they are passed but actually not supported. This makes
1054      * client development easier */
1055
1056     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1057     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1058     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1059     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1060     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1061
1062     pa_stream_ref(s);
1063
1064     s->direction = direction;
1065     s->flags = flags;
1066     s->corked = !!(flags & PA_STREAM_START_CORKED);
1067
1068     if (sync_stream)
1069         s->syncid = sync_stream->syncid;
1070
1071     if (attr)
1072         s->buffer_attr = *attr;
1073     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1074
1075     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1076         pa_usec_t x;
1077
1078         x = pa_rtclock_now();
1079
1080         pa_assert(!s->smoother);
1081         s->smoother = pa_smoother_new(
1082                 SMOOTHER_ADJUST_TIME,
1083                 SMOOTHER_HISTORY_TIME,
1084                 !(flags & PA_STREAM_NOT_MONOTONIC),
1085                 TRUE,
1086                 SMOOTHER_MIN_HISTORY,
1087                 x,
1088                 TRUE);
1089     }
1090
1091     if (!dev)
1092         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1093
1094     t = pa_tagstruct_command(
1095             s->context,
1096             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1097             &tag);
1098
1099     if (s->context->version < 13)
1100         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1101
1102     pa_tagstruct_put(
1103             t,
1104             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1105             PA_TAG_CHANNEL_MAP, &s->channel_map,
1106             PA_TAG_U32, PA_INVALID_INDEX,
1107             PA_TAG_STRING, dev,
1108             PA_TAG_U32, s->buffer_attr.maxlength,
1109             PA_TAG_BOOLEAN, s->corked,
1110             PA_TAG_INVALID);
1111
1112     if (s->direction == PA_STREAM_PLAYBACK) {
1113         pa_cvolume cv;
1114
1115         pa_tagstruct_put(
1116                 t,
1117                 PA_TAG_U32, s->buffer_attr.tlength,
1118                 PA_TAG_U32, s->buffer_attr.prebuf,
1119                 PA_TAG_U32, s->buffer_attr.minreq,
1120                 PA_TAG_U32, s->syncid,
1121                 PA_TAG_INVALID);
1122
1123         volume_set = !!volume;
1124
1125         if (!volume)
1126             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1127
1128         pa_tagstruct_put_cvolume(t, volume);
1129     } else
1130         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1131
1132     if (s->context->version >= 12) {
1133         pa_tagstruct_put(
1134                 t,
1135                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1136                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1137                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1138                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1139                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1140                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1141                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1142                 PA_TAG_INVALID);
1143     }
1144
1145     if (s->context->version >= 13) {
1146
1147         if (s->direction == PA_STREAM_PLAYBACK)
1148             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1149         else
1150             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1151
1152         pa_tagstruct_put(
1153                 t,
1154                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1155                 PA_TAG_PROPLIST, s->proplist,
1156                 PA_TAG_INVALID);
1157
1158         if (s->direction == PA_STREAM_RECORD)
1159             pa_tagstruct_putu32(t, s->direct_on_input);
1160     }
1161
1162     if (s->context->version >= 14) {
1163
1164         if (s->direction == PA_STREAM_PLAYBACK)
1165             pa_tagstruct_put_boolean(t, volume_set);
1166
1167         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1168     }
1169
1170     if (s->context->version >= 15) {
1171
1172         if (s->direction == PA_STREAM_PLAYBACK)
1173             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1174
1175         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1176         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1177     }
1178
1179     if (s->context->version >= 17) {
1180
1181         if (s->direction == PA_STREAM_PLAYBACK)
1182             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1183
1184     }
1185
1186     pa_pstream_send_tagstruct(s->context->pstream, t);
1187     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1188
1189     pa_stream_set_state(s, PA_STREAM_CREATING);
1190
1191     pa_stream_unref(s);
1192     return 0;
1193 }
1194
1195 int pa_stream_connect_playback(
1196         pa_stream *s,
1197         const char *dev,
1198         const pa_buffer_attr *attr,
1199         pa_stream_flags_t flags,
1200         const pa_cvolume *volume,
1201         pa_stream *sync_stream) {
1202
1203     pa_assert(s);
1204     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1205
1206     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1207 }
1208
1209 int pa_stream_connect_record(
1210         pa_stream *s,
1211         const char *dev,
1212         const pa_buffer_attr *attr,
1213         pa_stream_flags_t flags) {
1214
1215     pa_assert(s);
1216     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1217
1218     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1219 }
1220
1221 int pa_stream_begin_write(
1222         pa_stream *s,
1223         void **data,
1224         size_t *nbytes) {
1225
1226     pa_assert(s);
1227     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1228
1229     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1230     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1231     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1232     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1233     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1234
1235     if (*nbytes != (size_t) -1) {
1236         size_t m, fs;
1237
1238         m = pa_mempool_block_size_max(s->context->mempool);
1239         fs = pa_frame_size(&s->sample_spec);
1240
1241         m = (m / fs) * fs;
1242         if (*nbytes > m)
1243             *nbytes = m;
1244     }
1245
1246     if (!s->write_memblock) {
1247         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1248         s->write_data = pa_memblock_acquire(s->write_memblock);
1249     }
1250
1251     *data = s->write_data;
1252     *nbytes = pa_memblock_get_length(s->write_memblock);
1253
1254     return 0;
1255 }
1256
1257 int pa_stream_cancel_write(
1258         pa_stream *s) {
1259
1260     pa_assert(s);
1261     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1262
1263     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1264     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1265     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1266     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1267
1268     pa_assert(s->write_data);
1269
1270     pa_memblock_release(s->write_memblock);
1271     pa_memblock_unref(s->write_memblock);
1272     s->write_memblock = NULL;
1273     s->write_data = NULL;
1274
1275     return 0;
1276 }
1277
1278 int pa_stream_write(
1279         pa_stream *s,
1280         const void *data,
1281         size_t length,
1282         pa_free_cb_t free_cb,
1283         int64_t offset,
1284         pa_seek_mode_t seek) {
1285
1286     pa_assert(s);
1287     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1288     pa_assert(data);
1289
1290     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1291     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1292     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1293     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1294     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1295     PA_CHECK_VALIDITY(s->context,
1296                       !s->write_memblock ||
1297                       ((data >= s->write_data) &&
1298                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1299                       PA_ERR_INVALID);
1300     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1301
1302     if (s->write_memblock) {
1303         pa_memchunk chunk;
1304
1305         /* pa_stream_write_begin() was called before */
1306
1307         pa_memblock_release(s->write_memblock);
1308
1309         chunk.memblock = s->write_memblock;
1310         chunk.index = (const char *) data - (const char *) s->write_data;
1311         chunk.length = length;
1312
1313         s->write_memblock = NULL;
1314         s->write_data = NULL;
1315
1316         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1317         pa_memblock_unref(chunk.memblock);
1318
1319     } else {
1320         pa_seek_mode_t t_seek = seek;
1321         int64_t t_offset = offset;
1322         size_t t_length = length;
1323         const void *t_data = data;
1324
1325         /* pa_stream_write_begin() was not called before */
1326
1327         while (t_length > 0) {
1328             pa_memchunk chunk;
1329
1330             chunk.index = 0;
1331
1332             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1333                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1334                 chunk.length = t_length;
1335             } else {
1336                 void *d;
1337
1338                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1339                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1340
1341                 d = pa_memblock_acquire(chunk.memblock);
1342                 memcpy(d, t_data, chunk.length);
1343                 pa_memblock_release(chunk.memblock);
1344             }
1345
1346             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1347
1348             t_offset = 0;
1349             t_seek = PA_SEEK_RELATIVE;
1350
1351             t_data = (const uint8_t*) t_data + chunk.length;
1352             t_length -= chunk.length;
1353
1354             pa_memblock_unref(chunk.memblock);
1355         }
1356
1357         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1358             free_cb((void*) data);
1359     }
1360
1361     /* This is obviously wrong since we ignore the seeking index . But
1362      * that's OK, the server side applies the same error */
1363     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1364
1365     if (s->direction == PA_STREAM_PLAYBACK) {
1366
1367         /* Update latency request correction */
1368         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1369
1370             if (seek == PA_SEEK_ABSOLUTE) {
1371                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1372                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1373                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1374             } else if (seek == PA_SEEK_RELATIVE) {
1375                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1376                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1377             } else
1378                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1379         }
1380
1381         /* Update the write index in the already available latency data */
1382         if (s->timing_info_valid) {
1383
1384             if (seek == PA_SEEK_ABSOLUTE) {
1385                 s->timing_info.write_index_corrupt = FALSE;
1386                 s->timing_info.write_index = offset + (int64_t) length;
1387             } else if (seek == PA_SEEK_RELATIVE) {
1388                 if (!s->timing_info.write_index_corrupt)
1389                     s->timing_info.write_index += offset + (int64_t) length;
1390             } else
1391                 s->timing_info.write_index_corrupt = TRUE;
1392         }
1393
1394         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1395             request_auto_timing_update(s, TRUE);
1396     }
1397
1398     return 0;
1399 }
1400
1401 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1402     pa_assert(s);
1403     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1404     pa_assert(data);
1405     pa_assert(length);
1406
1407     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1408     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1409     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1410
1411     if (!s->peek_memchunk.memblock) {
1412
1413         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1414             *data = NULL;
1415             *length = 0;
1416             return 0;
1417         }
1418
1419         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1420     }
1421
1422     pa_assert(s->peek_data);
1423     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1424     *length = s->peek_memchunk.length;
1425     return 0;
1426 }
1427
1428 int pa_stream_drop(pa_stream *s) {
1429     pa_assert(s);
1430     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1431
1432     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1433     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1434     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1435     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1436
1437     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1438
1439     /* Fix the simulated local read index */
1440     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1441         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1442
1443     pa_assert(s->peek_data);
1444     pa_memblock_release(s->peek_memchunk.memblock);
1445     pa_memblock_unref(s->peek_memchunk.memblock);
1446     pa_memchunk_reset(&s->peek_memchunk);
1447
1448     return 0;
1449 }
1450
1451 size_t pa_stream_writable_size(pa_stream *s) {
1452     pa_assert(s);
1453     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1454
1455     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1456     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1457     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1458
1459     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1460 }
1461
1462 size_t pa_stream_readable_size(pa_stream *s) {
1463     pa_assert(s);
1464     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1465
1466     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1467     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1468     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1469
1470     return pa_memblockq_get_length(s->record_memblockq);
1471 }
1472
1473 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1474     pa_operation *o;
1475     pa_tagstruct *t;
1476     uint32_t tag;
1477
1478     pa_assert(s);
1479     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1480
1481     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1482     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1483     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1484
1485     /* Ask for a timing update before we cork/uncork to get the best
1486      * accuracy for the transport latency suitable for the
1487      * check_smoother_status() call in the started callback */
1488     request_auto_timing_update(s, TRUE);
1489
1490     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1491
1492     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1493     pa_tagstruct_putu32(t, s->channel);
1494     pa_pstream_send_tagstruct(s->context->pstream, t);
1495     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1496
1497     /* This might cause the read index to conitnue again, hence
1498      * let's request a timing update */
1499     request_auto_timing_update(s, TRUE);
1500
1501     return o;
1502 }
1503
1504 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1505     pa_usec_t usec;
1506
1507     pa_assert(s);
1508     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1509     pa_assert(s->state == PA_STREAM_READY);
1510     pa_assert(s->direction != PA_STREAM_UPLOAD);
1511     pa_assert(s->timing_info_valid);
1512     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1513     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1514
1515     if (s->direction == PA_STREAM_PLAYBACK) {
1516         /* The last byte that was written into the output device
1517          * had this time value associated */
1518         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1519
1520         if (!s->corked && !s->suspended) {
1521
1522             if (!ignore_transport)
1523                 /* Because the latency info took a little time to come
1524                  * to us, we assume that the real output time is actually
1525                  * a little ahead */
1526                 usec += s->timing_info.transport_usec;
1527
1528             /* However, the output device usually maintains a buffer
1529                too, hence the real sample currently played is a little
1530                back  */
1531             if (s->timing_info.sink_usec >= usec)
1532                 usec = 0;
1533             else
1534                 usec -= s->timing_info.sink_usec;
1535         }
1536
1537     } else {
1538         pa_assert(s->direction == PA_STREAM_RECORD);
1539
1540         /* The last byte written into the server side queue had
1541          * this time value associated */
1542         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1543
1544         if (!s->corked && !s->suspended) {
1545
1546             if (!ignore_transport)
1547                 /* Add transport latency */
1548                 usec += s->timing_info.transport_usec;
1549
1550             /* Add latency of data in device buffer */
1551             usec += s->timing_info.source_usec;
1552
1553             /* If this is a monitor source, we need to correct the
1554              * time by the playback device buffer */
1555             if (s->timing_info.sink_usec >= usec)
1556                 usec = 0;
1557             else
1558                 usec -= s->timing_info.sink_usec;
1559         }
1560     }
1561
1562     return usec;
1563 }
1564
1565 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1566     pa_operation *o = userdata;
1567     struct timeval local, remote, now;
1568     pa_timing_info *i;
1569     pa_bool_t playing = FALSE;
1570     uint64_t underrun_for = 0, playing_for = 0;
1571
1572     pa_assert(pd);
1573     pa_assert(o);
1574     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1575
1576     if (!o->context || !o->stream)
1577         goto finish;
1578
1579     i = &o->stream->timing_info;
1580
1581     o->stream->timing_info_valid = FALSE;
1582     i->write_index_corrupt = TRUE;
1583     i->read_index_corrupt = TRUE;
1584
1585     if (command != PA_COMMAND_REPLY) {
1586         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1587             goto finish;
1588
1589     } else {
1590
1591         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1592             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1593             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1594             pa_tagstruct_get_timeval(t, &local) < 0 ||
1595             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1596             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1597             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1598
1599             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1600             goto finish;
1601         }
1602
1603         if (o->context->version >= 13 &&
1604             o->stream->direction == PA_STREAM_PLAYBACK)
1605             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1606                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1607
1608                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1609                 goto finish;
1610             }
1611
1612
1613         if (!pa_tagstruct_eof(t)) {
1614             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1615             goto finish;
1616         }
1617         o->stream->timing_info_valid = TRUE;
1618         i->write_index_corrupt = FALSE;
1619         i->read_index_corrupt = FALSE;
1620
1621         i->playing = (int) playing;
1622         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1623
1624         pa_gettimeofday(&now);
1625
1626         /* Calculcate timestamps */
1627         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1628             /* local and remote seem to have synchronized clocks */
1629
1630             if (o->stream->direction == PA_STREAM_PLAYBACK)
1631                 i->transport_usec = pa_timeval_diff(&remote, &local);
1632             else
1633                 i->transport_usec = pa_timeval_diff(&now, &remote);
1634
1635             i->synchronized_clocks = TRUE;
1636             i->timestamp = remote;
1637         } else {
1638             /* clocks are not synchronized, let's estimate latency then */
1639             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1640             i->synchronized_clocks = FALSE;
1641             i->timestamp = local;
1642             pa_timeval_add(&i->timestamp, i->transport_usec);
1643         }
1644
1645         /* Invalidate read and write indexes if necessary */
1646         if (tag < o->stream->read_index_not_before)
1647             i->read_index_corrupt = TRUE;
1648
1649         if (tag < o->stream->write_index_not_before)
1650             i->write_index_corrupt = TRUE;
1651
1652         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1653             /* Write index correction */
1654
1655             int n, j;
1656             uint32_t ctag = tag;
1657
1658             /* Go through the saved correction values and add up the
1659              * total correction.*/
1660             for (n = 0, j = o->stream->current_write_index_correction+1;
1661                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1662                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1663
1664                 /* Step over invalid data or out-of-date data */
1665                 if (!o->stream->write_index_corrections[j].valid ||
1666                     o->stream->write_index_corrections[j].tag < ctag)
1667                     continue;
1668
1669                 /* Make sure that everything is in order */
1670                 ctag = o->stream->write_index_corrections[j].tag+1;
1671
1672                 /* Now fix the write index */
1673                 if (o->stream->write_index_corrections[j].corrupt) {
1674                     /* A corrupting seek was made */
1675                     i->write_index_corrupt = TRUE;
1676                 } else if (o->stream->write_index_corrections[j].absolute) {
1677                     /* An absolute seek was made */
1678                     i->write_index = o->stream->write_index_corrections[j].value;
1679                     i->write_index_corrupt = FALSE;
1680                 } else if (!i->write_index_corrupt) {
1681                     /* A relative seek was made */
1682                     i->write_index += o->stream->write_index_corrections[j].value;
1683                 }
1684             }
1685
1686             /* Clear old correction entries */
1687             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1688                 if (!o->stream->write_index_corrections[n].valid)
1689                     continue;
1690
1691                 if (o->stream->write_index_corrections[n].tag <= tag)
1692                     o->stream->write_index_corrections[n].valid = FALSE;
1693             }
1694         }
1695
1696         if (o->stream->direction == PA_STREAM_RECORD) {
1697             /* Read index correction */
1698
1699             if (!i->read_index_corrupt)
1700                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1701         }
1702
1703         /* Update smoother */
1704         if (o->stream->smoother) {
1705             pa_usec_t u, x;
1706
1707             u = x = pa_rtclock_now() - i->transport_usec;
1708
1709             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1710                 pa_usec_t su;
1711
1712                 /* If we weren't playing then it will take some time
1713                  * until the audio will actually come out through the
1714                  * speakers. Since we follow that timing here, we need
1715                  * to try to fix this up */
1716
1717                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1718
1719                 if (su < i->sink_usec)
1720                     x += i->sink_usec - su;
1721             }
1722
1723             if (!i->playing)
1724                 pa_smoother_pause(o->stream->smoother, x);
1725
1726             /* Update the smoother */
1727             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1728                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1729                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1730
1731             if (i->playing)
1732                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1733         }
1734     }
1735
1736     o->stream->auto_timing_update_requested = FALSE;
1737
1738     if (o->stream->latency_update_callback)
1739         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1740
1741     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1742         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1743         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1744     }
1745
1746 finish:
1747
1748     pa_operation_done(o);
1749     pa_operation_unref(o);
1750 }
1751
1752 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1753     uint32_t tag;
1754     pa_operation *o;
1755     pa_tagstruct *t;
1756     struct timeval now;
1757     int cidx = 0;
1758
1759     pa_assert(s);
1760     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1761
1762     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1763     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1764     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1765
1766     if (s->direction == PA_STREAM_PLAYBACK) {
1767         /* Find a place to store the write_index correction data for this entry */
1768         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1769
1770         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1771         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1772     }
1773     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1774
1775     t = pa_tagstruct_command(
1776             s->context,
1777             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1778             &tag);
1779     pa_tagstruct_putu32(t, s->channel);
1780     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1781
1782     pa_pstream_send_tagstruct(s->context->pstream, t);
1783     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1784
1785     if (s->direction == PA_STREAM_PLAYBACK) {
1786         /* Fill in initial correction data */
1787
1788         s->current_write_index_correction = cidx;
1789
1790         s->write_index_corrections[cidx].valid = TRUE;
1791         s->write_index_corrections[cidx].absolute = FALSE;
1792         s->write_index_corrections[cidx].corrupt = FALSE;
1793         s->write_index_corrections[cidx].tag = tag;
1794         s->write_index_corrections[cidx].value = 0;
1795     }
1796
1797     return o;
1798 }
1799
1800 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1801     pa_stream *s = userdata;
1802
1803     pa_assert(pd);
1804     pa_assert(s);
1805     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1806
1807     pa_stream_ref(s);
1808
1809     if (command != PA_COMMAND_REPLY) {
1810         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1811             goto finish;
1812
1813         pa_stream_set_state(s, PA_STREAM_FAILED);
1814         goto finish;
1815     } else if (!pa_tagstruct_eof(t)) {
1816         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1817         goto finish;
1818     }
1819
1820     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1821
1822 finish:
1823     pa_stream_unref(s);
1824 }
1825
1826 int pa_stream_disconnect(pa_stream *s) {
1827     pa_tagstruct *t;
1828     uint32_t tag;
1829
1830     pa_assert(s);
1831     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1832
1833     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1834     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1835     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1836
1837     pa_stream_ref(s);
1838
1839     t = pa_tagstruct_command(
1840             s->context,
1841             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1842                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1843             &tag);
1844     pa_tagstruct_putu32(t, s->channel);
1845     pa_pstream_send_tagstruct(s->context->pstream, t);
1846     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1847
1848     pa_stream_unref(s);
1849     return 0;
1850 }
1851
1852 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1853     pa_assert(s);
1854     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1855
1856     if (pa_detect_fork())
1857         return;
1858
1859     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1860         return;
1861
1862     s->read_callback = cb;
1863     s->read_userdata = userdata;
1864 }
1865
1866 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1867     pa_assert(s);
1868     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1869
1870     if (pa_detect_fork())
1871         return;
1872
1873     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1874         return;
1875
1876     s->write_callback = cb;
1877     s->write_userdata = userdata;
1878 }
1879
1880 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1881     pa_assert(s);
1882     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1883
1884     if (pa_detect_fork())
1885         return;
1886
1887     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1888         return;
1889
1890     s->state_callback = cb;
1891     s->state_userdata = userdata;
1892 }
1893
1894 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1895     pa_assert(s);
1896     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1897
1898     if (pa_detect_fork())
1899         return;
1900
1901     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1902         return;
1903
1904     s->overflow_callback = cb;
1905     s->overflow_userdata = userdata;
1906 }
1907
1908 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1909     pa_assert(s);
1910     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1911
1912     if (pa_detect_fork())
1913         return;
1914
1915     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1916         return;
1917
1918     s->underflow_callback = cb;
1919     s->underflow_userdata = userdata;
1920 }
1921
1922 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1923     pa_assert(s);
1924     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1925
1926     if (pa_detect_fork())
1927         return;
1928
1929     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1930         return;
1931
1932     s->latency_update_callback = cb;
1933     s->latency_update_userdata = userdata;
1934 }
1935
1936 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1937     pa_assert(s);
1938     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1939
1940     if (pa_detect_fork())
1941         return;
1942
1943     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1944         return;
1945
1946     s->moved_callback = cb;
1947     s->moved_userdata = userdata;
1948 }
1949
1950 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1951     pa_assert(s);
1952     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1953
1954     if (pa_detect_fork())
1955         return;
1956
1957     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1958         return;
1959
1960     s->suspended_callback = cb;
1961     s->suspended_userdata = userdata;
1962 }
1963
1964 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1965     pa_assert(s);
1966     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1967
1968     if (pa_detect_fork())
1969         return;
1970
1971     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1972         return;
1973
1974     s->started_callback = cb;
1975     s->started_userdata = userdata;
1976 }
1977
1978 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1979     pa_assert(s);
1980     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1981
1982     if (pa_detect_fork())
1983         return;
1984
1985     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1986         return;
1987
1988     s->event_callback = cb;
1989     s->event_userdata = userdata;
1990 }
1991
1992 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1993     pa_assert(s);
1994     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1995
1996     if (pa_detect_fork())
1997         return;
1998
1999     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2000         return;
2001
2002     s->buffer_attr_callback = cb;
2003     s->buffer_attr_userdata = userdata;
2004 }
2005
2006 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2007     pa_operation *o = userdata;
2008     int success = 1;
2009
2010     pa_assert(pd);
2011     pa_assert(o);
2012     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2013
2014     if (!o->context)
2015         goto finish;
2016
2017     if (command != PA_COMMAND_REPLY) {
2018         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2019             goto finish;
2020
2021         success = 0;
2022     } else if (!pa_tagstruct_eof(t)) {
2023         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2024         goto finish;
2025     }
2026
2027     if (o->callback) {
2028         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2029         cb(o->stream, success, o->userdata);
2030     }
2031
2032 finish:
2033     pa_operation_done(o);
2034     pa_operation_unref(o);
2035 }
2036
2037 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2038     pa_operation *o;
2039     pa_tagstruct *t;
2040     uint32_t tag;
2041
2042     pa_assert(s);
2043     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2044
2045     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2046     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2047     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2048
2049     /* Ask for a timing update before we cork/uncork to get the best
2050      * accuracy for the transport latency suitable for the
2051      * check_smoother_status() call in the started callback */
2052     request_auto_timing_update(s, TRUE);
2053
2054     s->corked = b;
2055
2056     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2057
2058     t = pa_tagstruct_command(
2059             s->context,
2060             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2061             &tag);
2062     pa_tagstruct_putu32(t, s->channel);
2063     pa_tagstruct_put_boolean(t, !!b);
2064     pa_pstream_send_tagstruct(s->context->pstream, t);
2065     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2066
2067     check_smoother_status(s, FALSE, FALSE, FALSE);
2068
2069     /* This might cause the indexes to hang/start again, hence let's
2070      * request a timing update, after the cork/uncork, too */
2071     request_auto_timing_update(s, TRUE);
2072
2073     return o;
2074 }
2075
2076 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2077     pa_tagstruct *t;
2078     pa_operation *o;
2079     uint32_t tag;
2080
2081     pa_assert(s);
2082     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2083
2084     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2085     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2086
2087     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2088
2089     t = pa_tagstruct_command(s->context, command, &tag);
2090     pa_tagstruct_putu32(t, s->channel);
2091     pa_pstream_send_tagstruct(s->context->pstream, t);
2092     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2093
2094     return o;
2095 }
2096
2097 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2098     pa_operation *o;
2099
2100     pa_assert(s);
2101     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2102
2103     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2104     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2105     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2106
2107     /* Ask for a timing update *before* the flush, so that the
2108      * transport usec is as up to date as possible when we get the
2109      * underflow message and update the smoother status*/
2110     request_auto_timing_update(s, TRUE);
2111
2112     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2113         return NULL;
2114
2115     if (s->direction == PA_STREAM_PLAYBACK) {
2116
2117         if (s->write_index_corrections[s->current_write_index_correction].valid)
2118             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2119
2120         if (s->buffer_attr.prebuf > 0)
2121             check_smoother_status(s, FALSE, FALSE, TRUE);
2122
2123         /* This will change the write index, but leave the
2124          * read index untouched. */
2125         invalidate_indexes(s, FALSE, TRUE);
2126
2127     } else
2128         /* For record streams this has no influence on the write
2129          * index, but the read index might jump. */
2130         invalidate_indexes(s, TRUE, FALSE);
2131
2132     return o;
2133 }
2134
2135 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2136     pa_operation *o;
2137
2138     pa_assert(s);
2139     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140
2141     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2142     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2143     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2144     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2145
2146     /* Ask for a timing update before we cork/uncork to get the best
2147      * accuracy for the transport latency suitable for the
2148      * check_smoother_status() call in the started callback */
2149     request_auto_timing_update(s, TRUE);
2150
2151     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2152         return NULL;
2153
2154     /* This might cause the read index to hang again, hence
2155      * let's request a timing update */
2156     request_auto_timing_update(s, TRUE);
2157
2158     return o;
2159 }
2160
2161 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2162     pa_operation *o;
2163
2164     pa_assert(s);
2165     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2166
2167     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2168     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2169     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2170     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2171
2172     /* Ask for a timing update before we cork/uncork to get the best
2173      * accuracy for the transport latency suitable for the
2174      * check_smoother_status() call in the started callback */
2175     request_auto_timing_update(s, TRUE);
2176
2177     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2178         return NULL;
2179
2180     /* This might cause the read index to start moving again, hence
2181      * let's request a timing update */
2182     request_auto_timing_update(s, TRUE);
2183
2184     return o;
2185 }
2186
2187 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2188     pa_operation *o;
2189
2190     pa_assert(s);
2191     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2192     pa_assert(name);
2193
2194     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2195     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2196     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2197
2198     if (s->context->version >= 13) {
2199         pa_proplist *p = pa_proplist_new();
2200
2201         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2202         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2203         pa_proplist_free(p);
2204     } else {
2205         pa_tagstruct *t;
2206         uint32_t tag;
2207
2208         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2209         t = pa_tagstruct_command(
2210                 s->context,
2211                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2212                 &tag);
2213         pa_tagstruct_putu32(t, s->channel);
2214         pa_tagstruct_puts(t, name);
2215         pa_pstream_send_tagstruct(s->context->pstream, t);
2216         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2217     }
2218
2219     return o;
2220 }
2221
2222 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2223     pa_usec_t usec;
2224
2225     pa_assert(s);
2226     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2227
2228     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2229     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2230     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2231     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2232     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2233     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2234
2235     if (s->smoother)
2236         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2237     else
2238         usec = calc_time(s, FALSE);
2239
2240     /* Make sure the time runs monotonically */
2241     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2242         if (usec < s->previous_time)
2243             usec = s->previous_time;
2244         else
2245             s->previous_time = usec;
2246     }
2247
2248     if (r_usec)
2249         *r_usec = usec;
2250
2251     return 0;
2252 }
2253
2254 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2255     pa_assert(s);
2256     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2257
2258     if (negative)
2259         *negative = 0;
2260
2261     if (a >= b)
2262         return a-b;
2263     else {
2264         if (negative && s->direction == PA_STREAM_RECORD) {
2265             *negative = 1;
2266             return b-a;
2267         } else
2268             return 0;
2269     }
2270 }
2271
2272 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2273     pa_usec_t t, c;
2274     int r;
2275     int64_t cindex;
2276
2277     pa_assert(s);
2278     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279     pa_assert(r_usec);
2280
2281     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2282     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2283     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2284     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2285     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2286     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2287
2288     if ((r = pa_stream_get_time(s, &t)) < 0)
2289         return r;
2290
2291     if (s->direction == PA_STREAM_PLAYBACK)
2292         cindex = s->timing_info.write_index;
2293     else
2294         cindex = s->timing_info.read_index;
2295
2296     if (cindex < 0)
2297         cindex = 0;
2298
2299     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2300
2301     if (s->direction == PA_STREAM_PLAYBACK)
2302         *r_usec = time_counter_diff(s, c, t, negative);
2303     else
2304         *r_usec = time_counter_diff(s, t, c, negative);
2305
2306     return 0;
2307 }
2308
2309 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2310     pa_assert(s);
2311     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2312
2313     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2314     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2315     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2316     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2317
2318     return &s->timing_info;
2319 }
2320
2321 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2322     pa_assert(s);
2323     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2324
2325     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2326
2327     return &s->sample_spec;
2328 }
2329
2330 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2331     pa_assert(s);
2332     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2333
2334     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2335
2336     return &s->channel_map;
2337 }
2338
2339 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2340     pa_assert(s);
2341     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2342
2343     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2344     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2345     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2346
2347     return &s->buffer_attr;
2348 }
2349
2350 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2351     pa_operation *o = userdata;
2352     int success = 1;
2353
2354     pa_assert(pd);
2355     pa_assert(o);
2356     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2357
2358     if (!o->context)
2359         goto finish;
2360
2361     if (command != PA_COMMAND_REPLY) {
2362         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2363             goto finish;
2364
2365         success = 0;
2366     } else {
2367         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2368             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2369                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2370                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2371                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2372                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2373                 goto finish;
2374             }
2375         } else if (o->stream->direction == PA_STREAM_RECORD) {
2376             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2377                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2378                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2379                 goto finish;
2380             }
2381         }
2382
2383         if (o->stream->context->version >= 13) {
2384             pa_usec_t usec;
2385
2386             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2387                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2388                 goto finish;
2389             }
2390
2391             if (o->stream->direction == PA_STREAM_RECORD)
2392                 o->stream->timing_info.configured_source_usec = usec;
2393             else
2394                 o->stream->timing_info.configured_sink_usec = usec;
2395         }
2396
2397         if (!pa_tagstruct_eof(t)) {
2398             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2399             goto finish;
2400         }
2401     }
2402
2403     if (o->callback) {
2404         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2405         cb(o->stream, success, o->userdata);
2406     }
2407
2408 finish:
2409     pa_operation_done(o);
2410     pa_operation_unref(o);
2411 }
2412
2413
2414 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2415     pa_operation *o;
2416     pa_tagstruct *t;
2417     uint32_t tag;
2418
2419     pa_assert(s);
2420     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2421     pa_assert(attr);
2422
2423     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2424     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2425     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2426     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2427
2428     /* Ask for a timing update before we cork/uncork to get the best
2429      * accuracy for the transport latency suitable for the
2430      * check_smoother_status() call in the started callback */
2431     request_auto_timing_update(s, TRUE);
2432
2433     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2434
2435     t = pa_tagstruct_command(
2436             s->context,
2437             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2438             &tag);
2439     pa_tagstruct_putu32(t, s->channel);
2440
2441     pa_tagstruct_putu32(t, attr->maxlength);
2442
2443     if (s->direction == PA_STREAM_PLAYBACK)
2444         pa_tagstruct_put(
2445                 t,
2446                 PA_TAG_U32, attr->tlength,
2447                 PA_TAG_U32, attr->prebuf,
2448                 PA_TAG_U32, attr->minreq,
2449                 PA_TAG_INVALID);
2450     else
2451         pa_tagstruct_putu32(t, attr->fragsize);
2452
2453     if (s->context->version >= 13)
2454         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2455
2456     if (s->context->version >= 14)
2457         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2458
2459     pa_pstream_send_tagstruct(s->context->pstream, t);
2460     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2461
2462     /* This might cause changes in the read/write indexex, hence let's
2463      * request a timing update */
2464     request_auto_timing_update(s, TRUE);
2465
2466     return o;
2467 }
2468
2469 uint32_t pa_stream_get_device_index(pa_stream *s) {
2470     pa_assert(s);
2471     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2472
2473     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2474     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2475     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2476     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2477     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2478
2479     return s->device_index;
2480 }
2481
2482 const char *pa_stream_get_device_name(pa_stream *s) {
2483     pa_assert(s);
2484     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2485
2486     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2487     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2488     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2489     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2490     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2491
2492     return s->device_name;
2493 }
2494
2495 int pa_stream_is_suspended(pa_stream *s) {
2496     pa_assert(s);
2497     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2498
2499     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2500     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2501     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2502     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2503
2504     return s->suspended;
2505 }
2506
2507 int pa_stream_is_corked(pa_stream *s) {
2508     pa_assert(s);
2509     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2510
2511     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2512     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2513     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2514
2515     return s->corked;
2516 }
2517
2518 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2519     pa_operation *o = userdata;
2520     int success = 1;
2521
2522     pa_assert(pd);
2523     pa_assert(o);
2524     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2525
2526     if (!o->context)
2527         goto finish;
2528
2529     if (command != PA_COMMAND_REPLY) {
2530         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2531             goto finish;
2532
2533         success = 0;
2534     } else {
2535
2536         if (!pa_tagstruct_eof(t)) {
2537             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2538             goto finish;
2539         }
2540     }
2541
2542     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2543     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2544
2545     if (o->callback) {
2546         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2547         cb(o->stream, success, o->userdata);
2548     }
2549
2550 finish:
2551     pa_operation_done(o);
2552     pa_operation_unref(o);
2553 }
2554
2555
2556 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2557     pa_operation *o;
2558     pa_tagstruct *t;
2559     uint32_t tag;
2560
2561     pa_assert(s);
2562     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2563
2564     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2565     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2566     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2567     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2568     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2569     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2570
2571     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2572     o->private = PA_UINT_TO_PTR(rate);
2573
2574     t = pa_tagstruct_command(
2575             s->context,
2576             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2577             &tag);
2578     pa_tagstruct_putu32(t, s->channel);
2579     pa_tagstruct_putu32(t, rate);
2580
2581     pa_pstream_send_tagstruct(s->context->pstream, t);
2582     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2583
2584     return o;
2585 }
2586
2587 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2588     pa_operation *o;
2589     pa_tagstruct *t;
2590     uint32_t tag;
2591
2592     pa_assert(s);
2593     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2594
2595     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2596     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2597     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2598     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2599     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2600
2601     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2602
2603     t = pa_tagstruct_command(
2604             s->context,
2605             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2606             &tag);
2607     pa_tagstruct_putu32(t, s->channel);
2608     pa_tagstruct_putu32(t, (uint32_t) mode);
2609     pa_tagstruct_put_proplist(t, p);
2610
2611     pa_pstream_send_tagstruct(s->context->pstream, t);
2612     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2613
2614     /* Please note that we don't update s->proplist here, because we
2615      * don't export that field */
2616
2617     return o;
2618 }
2619
2620 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2621     pa_operation *o;
2622     pa_tagstruct *t;
2623     uint32_t tag;
2624     const char * const*k;
2625
2626     pa_assert(s);
2627     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2628
2629     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2630     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2631     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2632     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2633     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2634
2635     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2636
2637     t = pa_tagstruct_command(
2638             s->context,
2639             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2640             &tag);
2641     pa_tagstruct_putu32(t, s->channel);
2642
2643     for (k = keys; *k; k++)
2644         pa_tagstruct_puts(t, *k);
2645
2646     pa_tagstruct_puts(t, NULL);
2647
2648     pa_pstream_send_tagstruct(s->context->pstream, t);
2649     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2650
2651     /* Please note that we don't update s->proplist here, because we
2652      * don't export that field */
2653
2654     return o;
2655 }
2656
2657 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2658     pa_assert(s);
2659     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2660
2661     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2662     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2663     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2664     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2665
2666     s->direct_on_input = sink_input_idx;
2667
2668     return 0;
2669 }
2670
2671 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2672     pa_assert(s);
2673     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2674
2675     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2676     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2677     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2678
2679     return s->direct_on_input;
2680 }