client: implement $PULSE_LATENCY_MSEC
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/stream.h>
33 #include <pulse/timeval.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "fork-detect.h"
45 #include "internal.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 pa_stream *pa_stream_new_with_proplist(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_proplist *p) {
89
90     pa_stream *s;
91     int i;
92     pa_channel_map tmap;
93
94     pa_assert(c);
95     pa_assert(PA_REFCNT_VALUE(c) >= 1);
96
97     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
102     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
103     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104
105     if (!map)
106         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
107
108     s = pa_xnew(pa_stream, 1);
109     PA_REFCNT_INIT(s);
110     s->context = c;
111     s->mainloop = c->mainloop;
112
113     s->direction = PA_STREAM_NODIRECTION;
114     s->state = PA_STREAM_UNCONNECTED;
115     s->flags = 0;
116
117     s->sample_spec = *ss;
118     s->channel_map = *map;
119
120     s->direct_on_input = PA_INVALID_INDEX;
121
122     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
123     if (name)
124         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
125
126     s->channel = 0;
127     s->channel_valid = FALSE;
128     s->syncid = c->csyncid++;
129     s->stream_index = PA_INVALID_INDEX;
130
131     s->requested_bytes = 0;
132     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
133
134     /* We initialize der target length here, so that if the user
135      * passes no explicit buffering metrics the default is similar to
136      * what older PA versions provided. */
137
138     s->buffer_attr.maxlength = (uint32_t) -1;
139     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
140     s->buffer_attr.minreq = (uint32_t) -1;
141     s->buffer_attr.prebuf = (uint32_t) -1;
142     s->buffer_attr.fragsize = (uint32_t) -1;
143
144     s->device_index = PA_INVALID_INDEX;
145     s->device_name = NULL;
146     s->suspended = FALSE;
147     s->corked = FALSE;
148
149     s->write_memblock = NULL;
150     s->write_data = NULL;
151
152     pa_memchunk_reset(&s->peek_memchunk);
153     s->peek_data = NULL;
154     s->record_memblockq = NULL;
155
156     memset(&s->timing_info, 0, sizeof(s->timing_info));
157     s->timing_info_valid = FALSE;
158
159     s->previous_time = 0;
160
161     s->read_index_not_before = 0;
162     s->write_index_not_before = 0;
163     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
164         s->write_index_corrections[i].valid = 0;
165     s->current_write_index_correction = 0;
166
167     s->auto_timing_update_event = NULL;
168     s->auto_timing_update_requested = FALSE;
169     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
170
171     reset_callbacks(s);
172
173     s->smoother = NULL;
174
175     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
176     PA_LLIST_PREPEND(pa_stream, c->streams, s);
177     pa_stream_ref(s);
178
179     return s;
180 }
181
182 static void stream_unlink(pa_stream *s) {
183     pa_operation *o, *n;
184     pa_assert(s);
185
186     if (!s->context)
187         return;
188
189     /* Detach from context */
190
191     /* Unref all operatio object that point to us */
192     for (o = s->context->operations; o; o = n) {
193         n = o->next;
194
195         if (o->stream == s)
196             pa_operation_cancel(o);
197     }
198
199     /* Drop all outstanding replies for this stream */
200     if (s->context->pdispatch)
201         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
202
203     if (s->channel_valid) {
204         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
205         s->channel = 0;
206         s->channel_valid = FALSE;
207     }
208
209     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
210     pa_stream_unref(s);
211
212     s->context = NULL;
213
214     if (s->auto_timing_update_event) {
215         pa_assert(s->mainloop);
216         s->mainloop->time_free(s->auto_timing_update_event);
217     }
218
219     reset_callbacks(s);
220 }
221
222 static void stream_free(pa_stream *s) {
223     pa_assert(s);
224
225     stream_unlink(s);
226
227     if (s->write_memblock) {
228         pa_memblock_release(s->write_memblock);
229         pa_memblock_unref(s->write_data);
230     }
231
232     if (s->peek_memchunk.memblock) {
233         if (s->peek_data)
234             pa_memblock_release(s->peek_memchunk.memblock);
235         pa_memblock_unref(s->peek_memchunk.memblock);
236     }
237
238     if (s->record_memblockq)
239         pa_memblockq_free(s->record_memblockq);
240
241     if (s->proplist)
242         pa_proplist_free(s->proplist);
243
244     if (s->smoother)
245         pa_smoother_free(s->smoother);
246
247     pa_xfree(s->device_name);
248     pa_xfree(s);
249 }
250
251 void pa_stream_unref(pa_stream *s) {
252     pa_assert(s);
253     pa_assert(PA_REFCNT_VALUE(s) >= 1);
254
255     if (PA_REFCNT_DEC(s) <= 0)
256         stream_free(s);
257 }
258
259 pa_stream* pa_stream_ref(pa_stream *s) {
260     pa_assert(s);
261     pa_assert(PA_REFCNT_VALUE(s) >= 1);
262
263     PA_REFCNT_INC(s);
264     return s;
265 }
266
267 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
268     pa_assert(s);
269     pa_assert(PA_REFCNT_VALUE(s) >= 1);
270
271     return s->state;
272 }
273
274 pa_context* pa_stream_get_context(pa_stream *s) {
275     pa_assert(s);
276     pa_assert(PA_REFCNT_VALUE(s) >= 1);
277
278     return s->context;
279 }
280
281 uint32_t pa_stream_get_index(pa_stream *s) {
282     pa_assert(s);
283     pa_assert(PA_REFCNT_VALUE(s) >= 1);
284
285     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
286     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
287
288     return s->stream_index;
289 }
290
291 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
292     pa_assert(s);
293     pa_assert(PA_REFCNT_VALUE(s) >= 1);
294
295     if (s->state == st)
296         return;
297
298     pa_stream_ref(s);
299
300     s->state = st;
301
302     if (s->state_callback)
303         s->state_callback(s, s->state_userdata);
304
305     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
306         stream_unlink(s);
307
308     pa_stream_unref(s);
309 }
310
311 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
312     pa_assert(s);
313     pa_assert(PA_REFCNT_VALUE(s) >= 1);
314
315     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
316         return;
317
318     if (s->state == PA_STREAM_READY &&
319         (force || !s->auto_timing_update_requested)) {
320         pa_operation *o;
321
322 /*         pa_log("Automatically requesting new timing data"); */
323
324         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
325             pa_operation_unref(o);
326             s->auto_timing_update_requested = TRUE;
327         }
328     }
329
330     if (s->auto_timing_update_event) {
331         if (force)
332             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
333
334         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
335
336         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
337     }
338 }
339
340 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
341     pa_context *c = userdata;
342     pa_stream *s;
343     uint32_t channel;
344
345     pa_assert(pd);
346     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
347     pa_assert(t);
348     pa_assert(c);
349     pa_assert(PA_REFCNT_VALUE(c) >= 1);
350
351     pa_context_ref(c);
352
353     if (pa_tagstruct_getu32(t, &channel) < 0 ||
354         !pa_tagstruct_eof(t)) {
355         pa_context_fail(c, PA_ERR_PROTOCOL);
356         goto finish;
357     }
358
359     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
360         goto finish;
361
362     if (s->state != PA_STREAM_READY)
363         goto finish;
364
365     pa_context_set_error(c, PA_ERR_KILLED);
366     pa_stream_set_state(s, PA_STREAM_FAILED);
367
368 finish:
369     pa_context_unref(c);
370 }
371
372 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
373     pa_usec_t x;
374
375     pa_assert(s);
376     pa_assert(!force_start || !force_stop);
377
378     if (!s->smoother)
379         return;
380
381     x = pa_rtclock_now();
382
383     if (s->timing_info_valid) {
384         if (aposteriori)
385             x -= s->timing_info.transport_usec;
386         else
387             x += s->timing_info.transport_usec;
388     }
389
390     if (s->suspended || s->corked || force_stop)
391         pa_smoother_pause(s->smoother, x);
392     else if (force_start || s->buffer_attr.prebuf == 0) {
393
394         if (!s->timing_info_valid &&
395             !aposteriori &&
396             !force_start &&
397             !force_stop &&
398             s->context->version >= 13) {
399
400             /* If the server supports STARTED events we take them as
401              * indications when audio really starts/stops playing, if
402              * we don't have any timing info yet -- instead of trying
403              * to be smart and guessing the server time. Otherwise the
404              * unknown transport delay add too much noise to our time
405              * calculations. */
406
407             return;
408         }
409
410         pa_smoother_resume(s->smoother, x, TRUE);
411     }
412
413     /* Please note that we have no idea if playback actually started
414      * if prebuf is non-zero! */
415 }
416
417 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
418     pa_context *c = userdata;
419     pa_stream *s;
420     uint32_t channel;
421     const char *dn;
422     pa_bool_t suspended;
423     uint32_t di;
424     pa_usec_t usec = 0;
425     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
426
427     pa_assert(pd);
428     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
429     pa_assert(t);
430     pa_assert(c);
431     pa_assert(PA_REFCNT_VALUE(c) >= 1);
432
433     pa_context_ref(c);
434
435     if (c->version < 12) {
436         pa_context_fail(c, PA_ERR_PROTOCOL);
437         goto finish;
438     }
439
440     if (pa_tagstruct_getu32(t, &channel) < 0 ||
441         pa_tagstruct_getu32(t, &di) < 0 ||
442         pa_tagstruct_gets(t, &dn) < 0 ||
443         pa_tagstruct_get_boolean(t, &suspended) < 0) {
444         pa_context_fail(c, PA_ERR_PROTOCOL);
445         goto finish;
446     }
447
448     if (c->version >= 13) {
449
450         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
451             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
452                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
453                 pa_tagstruct_get_usec(t, &usec) < 0) {
454                 pa_context_fail(c, PA_ERR_PROTOCOL);
455                 goto finish;
456             }
457         } else {
458             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
459                 pa_tagstruct_getu32(t, &tlength) < 0 ||
460                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
461                 pa_tagstruct_getu32(t, &minreq) < 0 ||
462                 pa_tagstruct_get_usec(t, &usec) < 0) {
463                 pa_context_fail(c, PA_ERR_PROTOCOL);
464                 goto finish;
465             }
466         }
467     }
468
469     if (!pa_tagstruct_eof(t)) {
470         pa_context_fail(c, PA_ERR_PROTOCOL);
471         goto finish;
472     }
473
474     if (!dn || di == PA_INVALID_INDEX) {
475         pa_context_fail(c, PA_ERR_PROTOCOL);
476         goto finish;
477     }
478
479     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
480         goto finish;
481
482     if (s->state != PA_STREAM_READY)
483         goto finish;
484
485     if (c->version >= 13) {
486         if (s->direction == PA_STREAM_RECORD)
487             s->timing_info.configured_source_usec = usec;
488         else
489             s->timing_info.configured_sink_usec = usec;
490
491         s->buffer_attr.maxlength = maxlength;
492         s->buffer_attr.fragsize = fragsize;
493         s->buffer_attr.tlength = tlength;
494         s->buffer_attr.prebuf = prebuf;
495         s->buffer_attr.minreq = minreq;
496     }
497
498     pa_xfree(s->device_name);
499     s->device_name = pa_xstrdup(dn);
500     s->device_index = di;
501
502     s->suspended = suspended;
503
504     check_smoother_status(s, TRUE, FALSE, FALSE);
505     request_auto_timing_update(s, TRUE);
506
507     if (s->moved_callback)
508         s->moved_callback(s, s->moved_userdata);
509
510 finish:
511     pa_context_unref(c);
512 }
513
514 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515     pa_context *c = userdata;
516     pa_stream *s;
517     uint32_t channel;
518     pa_usec_t usec = 0;
519     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
520
521     pa_assert(pd);
522     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
523     pa_assert(t);
524     pa_assert(c);
525     pa_assert(PA_REFCNT_VALUE(c) >= 1);
526
527     pa_context_ref(c);
528
529     if (c->version < 15) {
530         pa_context_fail(c, PA_ERR_PROTOCOL);
531         goto finish;
532     }
533
534     if (pa_tagstruct_getu32(t, &channel) < 0) {
535         pa_context_fail(c, PA_ERR_PROTOCOL);
536         goto finish;
537     }
538
539     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
540         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
541             pa_tagstruct_getu32(t, &fragsize) < 0 ||
542             pa_tagstruct_get_usec(t, &usec) < 0) {
543             pa_context_fail(c, PA_ERR_PROTOCOL);
544             goto finish;
545         }
546     } else {
547         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
548             pa_tagstruct_getu32(t, &tlength) < 0 ||
549             pa_tagstruct_getu32(t, &prebuf) < 0 ||
550             pa_tagstruct_getu32(t, &minreq) < 0 ||
551             pa_tagstruct_get_usec(t, &usec) < 0) {
552             pa_context_fail(c, PA_ERR_PROTOCOL);
553             goto finish;
554         }
555     }
556
557     if (!pa_tagstruct_eof(t)) {
558         pa_context_fail(c, PA_ERR_PROTOCOL);
559         goto finish;
560     }
561
562     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
563         goto finish;
564
565     if (s->state != PA_STREAM_READY)
566         goto finish;
567
568     if (s->direction == PA_STREAM_RECORD)
569         s->timing_info.configured_source_usec = usec;
570     else
571         s->timing_info.configured_sink_usec = usec;
572
573     s->buffer_attr.maxlength = maxlength;
574     s->buffer_attr.fragsize = fragsize;
575     s->buffer_attr.tlength = tlength;
576     s->buffer_attr.prebuf = prebuf;
577     s->buffer_attr.minreq = minreq;
578
579     request_auto_timing_update(s, TRUE);
580
581     if (s->buffer_attr_callback)
582         s->buffer_attr_callback(s, s->buffer_attr_userdata);
583
584 finish:
585     pa_context_unref(c);
586 }
587
588 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
589     pa_context *c = userdata;
590     pa_stream *s;
591     uint32_t channel;
592     pa_bool_t suspended;
593
594     pa_assert(pd);
595     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
596     pa_assert(t);
597     pa_assert(c);
598     pa_assert(PA_REFCNT_VALUE(c) >= 1);
599
600     pa_context_ref(c);
601
602     if (c->version < 12) {
603         pa_context_fail(c, PA_ERR_PROTOCOL);
604         goto finish;
605     }
606
607     if (pa_tagstruct_getu32(t, &channel) < 0 ||
608         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
609         !pa_tagstruct_eof(t)) {
610         pa_context_fail(c, PA_ERR_PROTOCOL);
611         goto finish;
612     }
613
614     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
615         goto finish;
616
617     if (s->state != PA_STREAM_READY)
618         goto finish;
619
620     s->suspended = suspended;
621
622     check_smoother_status(s, TRUE, FALSE, FALSE);
623     request_auto_timing_update(s, TRUE);
624
625     if (s->suspended_callback)
626         s->suspended_callback(s, s->suspended_userdata);
627
628 finish:
629     pa_context_unref(c);
630 }
631
632 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
633     pa_context *c = userdata;
634     pa_stream *s;
635     uint32_t channel;
636
637     pa_assert(pd);
638     pa_assert(command == PA_COMMAND_STARTED);
639     pa_assert(t);
640     pa_assert(c);
641     pa_assert(PA_REFCNT_VALUE(c) >= 1);
642
643     pa_context_ref(c);
644
645     if (c->version < 13) {
646         pa_context_fail(c, PA_ERR_PROTOCOL);
647         goto finish;
648     }
649
650     if (pa_tagstruct_getu32(t, &channel) < 0 ||
651         !pa_tagstruct_eof(t)) {
652         pa_context_fail(c, PA_ERR_PROTOCOL);
653         goto finish;
654     }
655
656     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
657         goto finish;
658
659     if (s->state != PA_STREAM_READY)
660         goto finish;
661
662     check_smoother_status(s, TRUE, TRUE, FALSE);
663     request_auto_timing_update(s, TRUE);
664
665     if (s->started_callback)
666         s->started_callback(s, s->started_userdata);
667
668 finish:
669     pa_context_unref(c);
670 }
671
672 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673     pa_context *c = userdata;
674     pa_stream *s;
675     uint32_t channel;
676     pa_proplist *pl = NULL;
677     const char *event;
678
679     pa_assert(pd);
680     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
681     pa_assert(t);
682     pa_assert(c);
683     pa_assert(PA_REFCNT_VALUE(c) >= 1);
684
685     pa_context_ref(c);
686
687     if (c->version < 15) {
688         pa_context_fail(c, PA_ERR_PROTOCOL);
689         goto finish;
690     }
691
692     pl = pa_proplist_new();
693
694     if (pa_tagstruct_getu32(t, &channel) < 0 ||
695         pa_tagstruct_gets(t, &event) < 0 ||
696         pa_tagstruct_get_proplist(t, pl) < 0 ||
697         !pa_tagstruct_eof(t) || !event) {
698         pa_context_fail(c, PA_ERR_PROTOCOL);
699         goto finish;
700     }
701
702     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
703         goto finish;
704
705     if (s->state != PA_STREAM_READY)
706         goto finish;
707
708     if (s->event_callback)
709         s->event_callback(s, event, pl, s->event_userdata);
710
711 finish:
712     pa_context_unref(c);
713
714     if (pl)
715         pa_proplist_free(pl);
716 }
717
718 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
719     pa_stream *s;
720     pa_context *c = userdata;
721     uint32_t bytes, channel;
722
723     pa_assert(pd);
724     pa_assert(command == PA_COMMAND_REQUEST);
725     pa_assert(t);
726     pa_assert(c);
727     pa_assert(PA_REFCNT_VALUE(c) >= 1);
728
729     pa_context_ref(c);
730
731     if (pa_tagstruct_getu32(t, &channel) < 0 ||
732         pa_tagstruct_getu32(t, &bytes) < 0 ||
733         !pa_tagstruct_eof(t)) {
734         pa_context_fail(c, PA_ERR_PROTOCOL);
735         goto finish;
736     }
737
738     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
739         goto finish;
740
741     if (s->state != PA_STREAM_READY)
742         goto finish;
743
744     s->requested_bytes += bytes;
745
746     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
747
748     if (s->requested_bytes > 0 && s->write_callback)
749         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
750
751 finish:
752     pa_context_unref(c);
753 }
754
755 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
756     pa_stream *s;
757     pa_context *c = userdata;
758     uint32_t channel;
759
760     pa_assert(pd);
761     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
762     pa_assert(t);
763     pa_assert(c);
764     pa_assert(PA_REFCNT_VALUE(c) >= 1);
765
766     pa_context_ref(c);
767
768     if (pa_tagstruct_getu32(t, &channel) < 0 ||
769         !pa_tagstruct_eof(t)) {
770         pa_context_fail(c, PA_ERR_PROTOCOL);
771         goto finish;
772     }
773
774     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
775         goto finish;
776
777     if (s->state != PA_STREAM_READY)
778         goto finish;
779
780     if (s->buffer_attr.prebuf > 0)
781         check_smoother_status(s, TRUE, FALSE, TRUE);
782
783     request_auto_timing_update(s, TRUE);
784
785     if (command == PA_COMMAND_OVERFLOW) {
786         if (s->overflow_callback)
787             s->overflow_callback(s, s->overflow_userdata);
788     } else if (command == PA_COMMAND_UNDERFLOW) {
789         if (s->underflow_callback)
790             s->underflow_callback(s, s->underflow_userdata);
791     }
792
793  finish:
794     pa_context_unref(c);
795 }
796
797 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
798     pa_assert(s);
799     pa_assert(PA_REFCNT_VALUE(s) >= 1);
800
801 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
802
803     if (s->state != PA_STREAM_READY)
804         return;
805
806     if (w) {
807         s->write_index_not_before = s->context->ctag;
808
809         if (s->timing_info_valid)
810             s->timing_info.write_index_corrupt = TRUE;
811
812 /*         pa_log("write_index invalidated"); */
813     }
814
815     if (r) {
816         s->read_index_not_before = s->context->ctag;
817
818         if (s->timing_info_valid)
819             s->timing_info.read_index_corrupt = TRUE;
820
821 /*         pa_log("read_index invalidated"); */
822     }
823
824     request_auto_timing_update(s, TRUE);
825 }
826
827 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
828     pa_stream *s = userdata;
829
830     pa_assert(s);
831     pa_assert(PA_REFCNT_VALUE(s) >= 1);
832
833     pa_stream_ref(s);
834     request_auto_timing_update(s, FALSE);
835     pa_stream_unref(s);
836 }
837
838 static void create_stream_complete(pa_stream *s) {
839     pa_assert(s);
840     pa_assert(PA_REFCNT_VALUE(s) >= 1);
841     pa_assert(s->state == PA_STREAM_CREATING);
842
843     pa_stream_set_state(s, PA_STREAM_READY);
844
845     if (s->requested_bytes > 0 && s->write_callback)
846         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
847
848     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
849         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
850         pa_assert(!s->auto_timing_update_event);
851         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
852
853         request_auto_timing_update(s, TRUE);
854     }
855
856     check_smoother_status(s, TRUE, FALSE, FALSE);
857 }
858
859 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
860     const char *e;
861
862     pa_assert(s);
863     pa_assert(attr);
864
865     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
866         uint32_t ms;
867
868         if (pa_atou(e, &ms) < 0 || ms <= 0)
869             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
870         else {
871             attr->maxlength = (uint32_t) -1;
872             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
873             attr->minreq = (uint32_t) -1;
874             attr->prebuf = (uint32_t) -1;
875             attr->fragsize = attr->tlength;
876         }
877
878         if (flags)
879             *flags |= PA_STREAM_ADJUST_LATENCY;
880     }
881
882     if (s->context->version >= 13)
883         return;
884
885     /* Version older than 0.9.10 didn't do server side buffer_attr
886      * selection, hence we have to fake it on the client side. */
887
888     /* We choose fairly conservative values here, to not confuse
889      * old clients with extremely large playback buffers */
890
891     if (attr->maxlength == (uint32_t) -1)
892         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
893
894     if (attr->tlength == (uint32_t) -1)
895         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
896
897     if (attr->minreq == (uint32_t) -1)
898         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
899
900     if (attr->prebuf == (uint32_t) -1)
901         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
902
903     if (attr->fragsize == (uint32_t) -1)
904         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
905 }
906
907 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
908     pa_stream *s = userdata;
909     uint32_t requested_bytes = 0;
910
911     pa_assert(pd);
912     pa_assert(s);
913     pa_assert(PA_REFCNT_VALUE(s) >= 1);
914     pa_assert(s->state == PA_STREAM_CREATING);
915
916     pa_stream_ref(s);
917
918     if (command != PA_COMMAND_REPLY) {
919         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
920             goto finish;
921
922         pa_stream_set_state(s, PA_STREAM_FAILED);
923         goto finish;
924     }
925
926     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
927         s->channel == PA_INVALID_INDEX ||
928         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
929         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
930         pa_context_fail(s->context, PA_ERR_PROTOCOL);
931         goto finish;
932     }
933
934     s->requested_bytes = (int64_t) requested_bytes;
935
936     if (s->context->version >= 9) {
937         if (s->direction == PA_STREAM_PLAYBACK) {
938             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
939                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
940                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
941                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
942                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
943                 goto finish;
944             }
945         } else if (s->direction == PA_STREAM_RECORD) {
946             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
947                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
948                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
949                 goto finish;
950             }
951         }
952     }
953
954     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
955         pa_sample_spec ss;
956         pa_channel_map cm;
957         const char *dn = NULL;
958         pa_bool_t suspended;
959
960         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
961             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
962             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
963             pa_tagstruct_gets(t, &dn) < 0 ||
964             pa_tagstruct_get_boolean(t, &suspended) < 0) {
965             pa_context_fail(s->context, PA_ERR_PROTOCOL);
966             goto finish;
967         }
968
969         if (!dn || s->device_index == PA_INVALID_INDEX ||
970             ss.channels != cm.channels ||
971             !pa_channel_map_valid(&cm) ||
972             !pa_sample_spec_valid(&ss) ||
973             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
974             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
975             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
976             pa_context_fail(s->context, PA_ERR_PROTOCOL);
977             goto finish;
978         }
979
980         pa_xfree(s->device_name);
981         s->device_name = pa_xstrdup(dn);
982         s->suspended = suspended;
983
984         s->channel_map = cm;
985         s->sample_spec = ss;
986     }
987
988     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
989         pa_usec_t usec;
990
991         if (pa_tagstruct_get_usec(t, &usec) < 0) {
992             pa_context_fail(s->context, PA_ERR_PROTOCOL);
993             goto finish;
994         }
995
996         if (s->direction == PA_STREAM_RECORD)
997             s->timing_info.configured_source_usec = usec;
998         else
999             s->timing_info.configured_sink_usec = usec;
1000     }
1001
1002     if (!pa_tagstruct_eof(t)) {
1003         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1004         goto finish;
1005     }
1006
1007     if (s->direction == PA_STREAM_RECORD) {
1008         pa_assert(!s->record_memblockq);
1009
1010         s->record_memblockq = pa_memblockq_new(
1011                 0,
1012                 s->buffer_attr.maxlength,
1013                 0,
1014                 pa_frame_size(&s->sample_spec),
1015                 1,
1016                 0,
1017                 0,
1018                 NULL);
1019     }
1020
1021     s->channel_valid = TRUE;
1022     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1023
1024     create_stream_complete(s);
1025
1026 finish:
1027     pa_stream_unref(s);
1028 }
1029
1030 static int create_stream(
1031         pa_stream_direction_t direction,
1032         pa_stream *s,
1033         const char *dev,
1034         const pa_buffer_attr *attr,
1035         pa_stream_flags_t flags,
1036         const pa_cvolume *volume,
1037         pa_stream *sync_stream) {
1038
1039     pa_tagstruct *t;
1040     uint32_t tag;
1041     pa_bool_t volume_set = FALSE;
1042
1043     pa_assert(s);
1044     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1045     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1046
1047     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1048     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1049     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1050     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1051                                               PA_STREAM_INTERPOLATE_TIMING|
1052                                               PA_STREAM_NOT_MONOTONIC|
1053                                               PA_STREAM_AUTO_TIMING_UPDATE|
1054                                               PA_STREAM_NO_REMAP_CHANNELS|
1055                                               PA_STREAM_NO_REMIX_CHANNELS|
1056                                               PA_STREAM_FIX_FORMAT|
1057                                               PA_STREAM_FIX_RATE|
1058                                               PA_STREAM_FIX_CHANNELS|
1059                                               PA_STREAM_DONT_MOVE|
1060                                               PA_STREAM_VARIABLE_RATE|
1061                                               PA_STREAM_PEAK_DETECT|
1062                                               PA_STREAM_START_MUTED|
1063                                               PA_STREAM_ADJUST_LATENCY|
1064                                               PA_STREAM_EARLY_REQUESTS|
1065                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1066                                               PA_STREAM_START_UNMUTED|
1067                                               PA_STREAM_FAIL_ON_SUSPEND|
1068                                               PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1069
1070     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1071     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1072     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1073     /* Althought some of the other flags are not supported on older
1074      * version, we don't check for them here, because it doesn't hurt
1075      * when they are passed but actually not supported. This makes
1076      * client development easier */
1077
1078     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1079     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1080     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1081     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1082     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1083
1084     pa_stream_ref(s);
1085
1086     s->direction = direction;
1087
1088     if (sync_stream)
1089         s->syncid = sync_stream->syncid;
1090
1091     if (attr)
1092         s->buffer_attr = *attr;
1093     patch_buffer_attr(s, &s->buffer_attr, &flags);
1094
1095     s->flags = flags;
1096     s->corked = !!(flags & PA_STREAM_START_CORKED);
1097
1098     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1099         pa_usec_t x;
1100
1101         x = pa_rtclock_now();
1102
1103         pa_assert(!s->smoother);
1104         s->smoother = pa_smoother_new(
1105                 SMOOTHER_ADJUST_TIME,
1106                 SMOOTHER_HISTORY_TIME,
1107                 !(flags & PA_STREAM_NOT_MONOTONIC),
1108                 TRUE,
1109                 SMOOTHER_MIN_HISTORY,
1110                 x,
1111                 TRUE);
1112     }
1113
1114     if (!dev)
1115         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1116
1117     t = pa_tagstruct_command(
1118             s->context,
1119             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1120             &tag);
1121
1122     if (s->context->version < 13)
1123         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1124
1125     pa_tagstruct_put(
1126             t,
1127             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1128             PA_TAG_CHANNEL_MAP, &s->channel_map,
1129             PA_TAG_U32, PA_INVALID_INDEX,
1130             PA_TAG_STRING, dev,
1131             PA_TAG_U32, s->buffer_attr.maxlength,
1132             PA_TAG_BOOLEAN, s->corked,
1133             PA_TAG_INVALID);
1134
1135     if (s->direction == PA_STREAM_PLAYBACK) {
1136         pa_cvolume cv;
1137
1138         pa_tagstruct_put(
1139                 t,
1140                 PA_TAG_U32, s->buffer_attr.tlength,
1141                 PA_TAG_U32, s->buffer_attr.prebuf,
1142                 PA_TAG_U32, s->buffer_attr.minreq,
1143                 PA_TAG_U32, s->syncid,
1144                 PA_TAG_INVALID);
1145
1146         volume_set = !!volume;
1147
1148         if (!volume)
1149             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1150
1151         pa_tagstruct_put_cvolume(t, volume);
1152     } else
1153         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1154
1155     if (s->context->version >= 12) {
1156         pa_tagstruct_put(
1157                 t,
1158                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1159                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1160                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1161                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1162                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1163                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1164                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1165                 PA_TAG_INVALID);
1166     }
1167
1168     if (s->context->version >= 13) {
1169
1170         if (s->direction == PA_STREAM_PLAYBACK)
1171             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1172         else
1173             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1174
1175         pa_tagstruct_put(
1176                 t,
1177                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1178                 PA_TAG_PROPLIST, s->proplist,
1179                 PA_TAG_INVALID);
1180
1181         if (s->direction == PA_STREAM_RECORD)
1182             pa_tagstruct_putu32(t, s->direct_on_input);
1183     }
1184
1185     if (s->context->version >= 14) {
1186
1187         if (s->direction == PA_STREAM_PLAYBACK)
1188             pa_tagstruct_put_boolean(t, volume_set);
1189
1190         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1191     }
1192
1193     if (s->context->version >= 15) {
1194
1195         if (s->direction == PA_STREAM_PLAYBACK)
1196             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1197
1198         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1199         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1200     }
1201
1202     if (s->context->version >= 17) {
1203
1204         if (s->direction == PA_STREAM_PLAYBACK)
1205             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1206
1207     }
1208
1209     pa_pstream_send_tagstruct(s->context->pstream, t);
1210     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1211
1212     pa_stream_set_state(s, PA_STREAM_CREATING);
1213
1214     pa_stream_unref(s);
1215     return 0;
1216 }
1217
1218 int pa_stream_connect_playback(
1219         pa_stream *s,
1220         const char *dev,
1221         const pa_buffer_attr *attr,
1222         pa_stream_flags_t flags,
1223         const pa_cvolume *volume,
1224         pa_stream *sync_stream) {
1225
1226     pa_assert(s);
1227     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1228
1229     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1230 }
1231
1232 int pa_stream_connect_record(
1233         pa_stream *s,
1234         const char *dev,
1235         const pa_buffer_attr *attr,
1236         pa_stream_flags_t flags) {
1237
1238     pa_assert(s);
1239     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1240
1241     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1242 }
1243
1244 int pa_stream_begin_write(
1245         pa_stream *s,
1246         void **data,
1247         size_t *nbytes) {
1248
1249     pa_assert(s);
1250     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1251
1252     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1253     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1254     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1255     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1256     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1257
1258     if (*nbytes != (size_t) -1) {
1259         size_t m, fs;
1260
1261         m = pa_mempool_block_size_max(s->context->mempool);
1262         fs = pa_frame_size(&s->sample_spec);
1263
1264         m = (m / fs) * fs;
1265         if (*nbytes > m)
1266             *nbytes = m;
1267     }
1268
1269     if (!s->write_memblock) {
1270         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1271         s->write_data = pa_memblock_acquire(s->write_memblock);
1272     }
1273
1274     *data = s->write_data;
1275     *nbytes = pa_memblock_get_length(s->write_memblock);
1276
1277     return 0;
1278 }
1279
1280 int pa_stream_cancel_write(
1281         pa_stream *s) {
1282
1283     pa_assert(s);
1284     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1285
1286     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1287     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1288     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1289     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1290
1291     pa_assert(s->write_data);
1292
1293     pa_memblock_release(s->write_memblock);
1294     pa_memblock_unref(s->write_memblock);
1295     s->write_memblock = NULL;
1296     s->write_data = NULL;
1297
1298     return 0;
1299 }
1300
1301 int pa_stream_write(
1302         pa_stream *s,
1303         const void *data,
1304         size_t length,
1305         pa_free_cb_t free_cb,
1306         int64_t offset,
1307         pa_seek_mode_t seek) {
1308
1309     pa_assert(s);
1310     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1311     pa_assert(data);
1312
1313     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1314     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1315     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1316     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1317     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1318     PA_CHECK_VALIDITY(s->context,
1319                       !s->write_memblock ||
1320                       ((data >= s->write_data) &&
1321                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1322                       PA_ERR_INVALID);
1323     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1324
1325     if (s->write_memblock) {
1326         pa_memchunk chunk;
1327
1328         /* pa_stream_write_begin() was called before */
1329
1330         pa_memblock_release(s->write_memblock);
1331
1332         chunk.memblock = s->write_memblock;
1333         chunk.index = (const char *) data - (const char *) s->write_data;
1334         chunk.length = length;
1335
1336         s->write_memblock = NULL;
1337         s->write_data = NULL;
1338
1339         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1340         pa_memblock_unref(chunk.memblock);
1341
1342     } else {
1343         pa_seek_mode_t t_seek = seek;
1344         int64_t t_offset = offset;
1345         size_t t_length = length;
1346         const void *t_data = data;
1347
1348         /* pa_stream_write_begin() was not called before */
1349
1350         while (t_length > 0) {
1351             pa_memchunk chunk;
1352
1353             chunk.index = 0;
1354
1355             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1356                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1357                 chunk.length = t_length;
1358             } else {
1359                 void *d;
1360
1361                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1362                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1363
1364                 d = pa_memblock_acquire(chunk.memblock);
1365                 memcpy(d, t_data, chunk.length);
1366                 pa_memblock_release(chunk.memblock);
1367             }
1368
1369             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1370
1371             t_offset = 0;
1372             t_seek = PA_SEEK_RELATIVE;
1373
1374             t_data = (const uint8_t*) t_data + chunk.length;
1375             t_length -= chunk.length;
1376
1377             pa_memblock_unref(chunk.memblock);
1378         }
1379
1380         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1381             free_cb((void*) data);
1382     }
1383
1384     /* This is obviously wrong since we ignore the seeking index . But
1385      * that's OK, the server side applies the same error */
1386     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1387
1388     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1389
1390     if (s->direction == PA_STREAM_PLAYBACK) {
1391
1392         /* Update latency request correction */
1393         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1394
1395             if (seek == PA_SEEK_ABSOLUTE) {
1396                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1397                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1398                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1399             } else if (seek == PA_SEEK_RELATIVE) {
1400                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1401                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1402             } else
1403                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1404         }
1405
1406         /* Update the write index in the already available latency data */
1407         if (s->timing_info_valid) {
1408
1409             if (seek == PA_SEEK_ABSOLUTE) {
1410                 s->timing_info.write_index_corrupt = FALSE;
1411                 s->timing_info.write_index = offset + (int64_t) length;
1412             } else if (seek == PA_SEEK_RELATIVE) {
1413                 if (!s->timing_info.write_index_corrupt)
1414                     s->timing_info.write_index += offset + (int64_t) length;
1415             } else
1416                 s->timing_info.write_index_corrupt = TRUE;
1417         }
1418
1419         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1420             request_auto_timing_update(s, TRUE);
1421     }
1422
1423     return 0;
1424 }
1425
1426 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1427     pa_assert(s);
1428     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1429     pa_assert(data);
1430     pa_assert(length);
1431
1432     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1433     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1434     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1435
1436     if (!s->peek_memchunk.memblock) {
1437
1438         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1439             *data = NULL;
1440             *length = 0;
1441             return 0;
1442         }
1443
1444         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1445     }
1446
1447     pa_assert(s->peek_data);
1448     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1449     *length = s->peek_memchunk.length;
1450     return 0;
1451 }
1452
1453 int pa_stream_drop(pa_stream *s) {
1454     pa_assert(s);
1455     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1456
1457     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1458     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1459     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1460     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1461
1462     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1463
1464     /* Fix the simulated local read index */
1465     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1466         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1467
1468     pa_assert(s->peek_data);
1469     pa_memblock_release(s->peek_memchunk.memblock);
1470     pa_memblock_unref(s->peek_memchunk.memblock);
1471     pa_memchunk_reset(&s->peek_memchunk);
1472
1473     return 0;
1474 }
1475
1476 size_t pa_stream_writable_size(pa_stream *s) {
1477     pa_assert(s);
1478     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1479
1480     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1481     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1482     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1483
1484     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1485 }
1486
1487 size_t pa_stream_readable_size(pa_stream *s) {
1488     pa_assert(s);
1489     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1490
1491     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1492     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1493     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1494
1495     return pa_memblockq_get_length(s->record_memblockq);
1496 }
1497
1498 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1499     pa_operation *o;
1500     pa_tagstruct *t;
1501     uint32_t tag;
1502
1503     pa_assert(s);
1504     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1505
1506     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1507     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1508     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1509
1510     /* Ask for a timing update before we cork/uncork to get the best
1511      * accuracy for the transport latency suitable for the
1512      * check_smoother_status() call in the started callback */
1513     request_auto_timing_update(s, TRUE);
1514
1515     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1516
1517     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1518     pa_tagstruct_putu32(t, s->channel);
1519     pa_pstream_send_tagstruct(s->context->pstream, t);
1520     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1521
1522     /* This might cause the read index to conitnue again, hence
1523      * let's request a timing update */
1524     request_auto_timing_update(s, TRUE);
1525
1526     return o;
1527 }
1528
1529 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1530     pa_usec_t usec;
1531
1532     pa_assert(s);
1533     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1534     pa_assert(s->state == PA_STREAM_READY);
1535     pa_assert(s->direction != PA_STREAM_UPLOAD);
1536     pa_assert(s->timing_info_valid);
1537     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1538     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1539
1540     if (s->direction == PA_STREAM_PLAYBACK) {
1541         /* The last byte that was written into the output device
1542          * had this time value associated */
1543         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1544
1545         if (!s->corked && !s->suspended) {
1546
1547             if (!ignore_transport)
1548                 /* Because the latency info took a little time to come
1549                  * to us, we assume that the real output time is actually
1550                  * a little ahead */
1551                 usec += s->timing_info.transport_usec;
1552
1553             /* However, the output device usually maintains a buffer
1554                too, hence the real sample currently played is a little
1555                back  */
1556             if (s->timing_info.sink_usec >= usec)
1557                 usec = 0;
1558             else
1559                 usec -= s->timing_info.sink_usec;
1560         }
1561
1562     } else {
1563         pa_assert(s->direction == PA_STREAM_RECORD);
1564
1565         /* The last byte written into the server side queue had
1566          * this time value associated */
1567         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1568
1569         if (!s->corked && !s->suspended) {
1570
1571             if (!ignore_transport)
1572                 /* Add transport latency */
1573                 usec += s->timing_info.transport_usec;
1574
1575             /* Add latency of data in device buffer */
1576             usec += s->timing_info.source_usec;
1577
1578             /* If this is a monitor source, we need to correct the
1579              * time by the playback device buffer */
1580             if (s->timing_info.sink_usec >= usec)
1581                 usec = 0;
1582             else
1583                 usec -= s->timing_info.sink_usec;
1584         }
1585     }
1586
1587     return usec;
1588 }
1589
1590 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1591     pa_operation *o = userdata;
1592     struct timeval local, remote, now;
1593     pa_timing_info *i;
1594     pa_bool_t playing = FALSE;
1595     uint64_t underrun_for = 0, playing_for = 0;
1596
1597     pa_assert(pd);
1598     pa_assert(o);
1599     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1600
1601     if (!o->context || !o->stream)
1602         goto finish;
1603
1604     i = &o->stream->timing_info;
1605
1606     o->stream->timing_info_valid = FALSE;
1607     i->write_index_corrupt = TRUE;
1608     i->read_index_corrupt = TRUE;
1609
1610     if (command != PA_COMMAND_REPLY) {
1611         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1612             goto finish;
1613
1614     } else {
1615
1616         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1617             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1618             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1619             pa_tagstruct_get_timeval(t, &local) < 0 ||
1620             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1621             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1622             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1623
1624             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1625             goto finish;
1626         }
1627
1628         if (o->context->version >= 13 &&
1629             o->stream->direction == PA_STREAM_PLAYBACK)
1630             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1631                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1632
1633                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1634                 goto finish;
1635             }
1636
1637
1638         if (!pa_tagstruct_eof(t)) {
1639             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1640             goto finish;
1641         }
1642         o->stream->timing_info_valid = TRUE;
1643         i->write_index_corrupt = FALSE;
1644         i->read_index_corrupt = FALSE;
1645
1646         i->playing = (int) playing;
1647         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1648
1649         pa_gettimeofday(&now);
1650
1651         /* Calculcate timestamps */
1652         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1653             /* local and remote seem to have synchronized clocks */
1654
1655             if (o->stream->direction == PA_STREAM_PLAYBACK)
1656                 i->transport_usec = pa_timeval_diff(&remote, &local);
1657             else
1658                 i->transport_usec = pa_timeval_diff(&now, &remote);
1659
1660             i->synchronized_clocks = TRUE;
1661             i->timestamp = remote;
1662         } else {
1663             /* clocks are not synchronized, let's estimate latency then */
1664             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1665             i->synchronized_clocks = FALSE;
1666             i->timestamp = local;
1667             pa_timeval_add(&i->timestamp, i->transport_usec);
1668         }
1669
1670         /* Invalidate read and write indexes if necessary */
1671         if (tag < o->stream->read_index_not_before)
1672             i->read_index_corrupt = TRUE;
1673
1674         if (tag < o->stream->write_index_not_before)
1675             i->write_index_corrupt = TRUE;
1676
1677         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1678             /* Write index correction */
1679
1680             int n, j;
1681             uint32_t ctag = tag;
1682
1683             /* Go through the saved correction values and add up the
1684              * total correction.*/
1685             for (n = 0, j = o->stream->current_write_index_correction+1;
1686                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1687                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1688
1689                 /* Step over invalid data or out-of-date data */
1690                 if (!o->stream->write_index_corrections[j].valid ||
1691                     o->stream->write_index_corrections[j].tag < ctag)
1692                     continue;
1693
1694                 /* Make sure that everything is in order */
1695                 ctag = o->stream->write_index_corrections[j].tag+1;
1696
1697                 /* Now fix the write index */
1698                 if (o->stream->write_index_corrections[j].corrupt) {
1699                     /* A corrupting seek was made */
1700                     i->write_index_corrupt = TRUE;
1701                 } else if (o->stream->write_index_corrections[j].absolute) {
1702                     /* An absolute seek was made */
1703                     i->write_index = o->stream->write_index_corrections[j].value;
1704                     i->write_index_corrupt = FALSE;
1705                 } else if (!i->write_index_corrupt) {
1706                     /* A relative seek was made */
1707                     i->write_index += o->stream->write_index_corrections[j].value;
1708                 }
1709             }
1710
1711             /* Clear old correction entries */
1712             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1713                 if (!o->stream->write_index_corrections[n].valid)
1714                     continue;
1715
1716                 if (o->stream->write_index_corrections[n].tag <= tag)
1717                     o->stream->write_index_corrections[n].valid = FALSE;
1718             }
1719         }
1720
1721         if (o->stream->direction == PA_STREAM_RECORD) {
1722             /* Read index correction */
1723
1724             if (!i->read_index_corrupt)
1725                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1726         }
1727
1728         /* Update smoother */
1729         if (o->stream->smoother) {
1730             pa_usec_t u, x;
1731
1732             u = x = pa_rtclock_now() - i->transport_usec;
1733
1734             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1735                 pa_usec_t su;
1736
1737                 /* If we weren't playing then it will take some time
1738                  * until the audio will actually come out through the
1739                  * speakers. Since we follow that timing here, we need
1740                  * to try to fix this up */
1741
1742                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1743
1744                 if (su < i->sink_usec)
1745                     x += i->sink_usec - su;
1746             }
1747
1748             if (!i->playing)
1749                 pa_smoother_pause(o->stream->smoother, x);
1750
1751             /* Update the smoother */
1752             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1753                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1754                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1755
1756             if (i->playing)
1757                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1758         }
1759     }
1760
1761     o->stream->auto_timing_update_requested = FALSE;
1762
1763     if (o->stream->latency_update_callback)
1764         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1765
1766     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1767         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1768         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1769     }
1770
1771 finish:
1772
1773     pa_operation_done(o);
1774     pa_operation_unref(o);
1775 }
1776
1777 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1778     uint32_t tag;
1779     pa_operation *o;
1780     pa_tagstruct *t;
1781     struct timeval now;
1782     int cidx = 0;
1783
1784     pa_assert(s);
1785     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1786
1787     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1788     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1789     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1790
1791     if (s->direction == PA_STREAM_PLAYBACK) {
1792         /* Find a place to store the write_index correction data for this entry */
1793         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1794
1795         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1796         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1797     }
1798     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1799
1800     t = pa_tagstruct_command(
1801             s->context,
1802             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1803             &tag);
1804     pa_tagstruct_putu32(t, s->channel);
1805     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1806
1807     pa_pstream_send_tagstruct(s->context->pstream, t);
1808     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1809
1810     if (s->direction == PA_STREAM_PLAYBACK) {
1811         /* Fill in initial correction data */
1812
1813         s->current_write_index_correction = cidx;
1814
1815         s->write_index_corrections[cidx].valid = TRUE;
1816         s->write_index_corrections[cidx].absolute = FALSE;
1817         s->write_index_corrections[cidx].corrupt = FALSE;
1818         s->write_index_corrections[cidx].tag = tag;
1819         s->write_index_corrections[cidx].value = 0;
1820     }
1821
1822     return o;
1823 }
1824
1825 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1826     pa_stream *s = userdata;
1827
1828     pa_assert(pd);
1829     pa_assert(s);
1830     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1831
1832     pa_stream_ref(s);
1833
1834     if (command != PA_COMMAND_REPLY) {
1835         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1836             goto finish;
1837
1838         pa_stream_set_state(s, PA_STREAM_FAILED);
1839         goto finish;
1840     } else if (!pa_tagstruct_eof(t)) {
1841         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1842         goto finish;
1843     }
1844
1845     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1846
1847 finish:
1848     pa_stream_unref(s);
1849 }
1850
1851 int pa_stream_disconnect(pa_stream *s) {
1852     pa_tagstruct *t;
1853     uint32_t tag;
1854
1855     pa_assert(s);
1856     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1857
1858     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1859     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1860     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1861
1862     pa_stream_ref(s);
1863
1864     t = pa_tagstruct_command(
1865             s->context,
1866             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1867                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1868             &tag);
1869     pa_tagstruct_putu32(t, s->channel);
1870     pa_pstream_send_tagstruct(s->context->pstream, t);
1871     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1872
1873     pa_stream_unref(s);
1874     return 0;
1875 }
1876
1877 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1878     pa_assert(s);
1879     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1880
1881     if (pa_detect_fork())
1882         return;
1883
1884     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1885         return;
1886
1887     s->read_callback = cb;
1888     s->read_userdata = userdata;
1889 }
1890
1891 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1892     pa_assert(s);
1893     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1894
1895     if (pa_detect_fork())
1896         return;
1897
1898     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1899         return;
1900
1901     s->write_callback = cb;
1902     s->write_userdata = userdata;
1903 }
1904
1905 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1906     pa_assert(s);
1907     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1908
1909     if (pa_detect_fork())
1910         return;
1911
1912     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1913         return;
1914
1915     s->state_callback = cb;
1916     s->state_userdata = userdata;
1917 }
1918
1919 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1920     pa_assert(s);
1921     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1922
1923     if (pa_detect_fork())
1924         return;
1925
1926     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1927         return;
1928
1929     s->overflow_callback = cb;
1930     s->overflow_userdata = userdata;
1931 }
1932
1933 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1934     pa_assert(s);
1935     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1936
1937     if (pa_detect_fork())
1938         return;
1939
1940     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1941         return;
1942
1943     s->underflow_callback = cb;
1944     s->underflow_userdata = userdata;
1945 }
1946
1947 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1948     pa_assert(s);
1949     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1950
1951     if (pa_detect_fork())
1952         return;
1953
1954     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1955         return;
1956
1957     s->latency_update_callback = cb;
1958     s->latency_update_userdata = userdata;
1959 }
1960
1961 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1962     pa_assert(s);
1963     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1964
1965     if (pa_detect_fork())
1966         return;
1967
1968     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1969         return;
1970
1971     s->moved_callback = cb;
1972     s->moved_userdata = userdata;
1973 }
1974
1975 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1976     pa_assert(s);
1977     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1978
1979     if (pa_detect_fork())
1980         return;
1981
1982     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1983         return;
1984
1985     s->suspended_callback = cb;
1986     s->suspended_userdata = userdata;
1987 }
1988
1989 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1990     pa_assert(s);
1991     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1992
1993     if (pa_detect_fork())
1994         return;
1995
1996     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1997         return;
1998
1999     s->started_callback = cb;
2000     s->started_userdata = userdata;
2001 }
2002
2003 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2004     pa_assert(s);
2005     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2006
2007     if (pa_detect_fork())
2008         return;
2009
2010     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2011         return;
2012
2013     s->event_callback = cb;
2014     s->event_userdata = userdata;
2015 }
2016
2017 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2018     pa_assert(s);
2019     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2020
2021     if (pa_detect_fork())
2022         return;
2023
2024     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2025         return;
2026
2027     s->buffer_attr_callback = cb;
2028     s->buffer_attr_userdata = userdata;
2029 }
2030
2031 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2032     pa_operation *o = userdata;
2033     int success = 1;
2034
2035     pa_assert(pd);
2036     pa_assert(o);
2037     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2038
2039     if (!o->context)
2040         goto finish;
2041
2042     if (command != PA_COMMAND_REPLY) {
2043         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2044             goto finish;
2045
2046         success = 0;
2047     } else if (!pa_tagstruct_eof(t)) {
2048         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2049         goto finish;
2050     }
2051
2052     if (o->callback) {
2053         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2054         cb(o->stream, success, o->userdata);
2055     }
2056
2057 finish:
2058     pa_operation_done(o);
2059     pa_operation_unref(o);
2060 }
2061
2062 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2063     pa_operation *o;
2064     pa_tagstruct *t;
2065     uint32_t tag;
2066
2067     pa_assert(s);
2068     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2069
2070     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2071     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2072     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2073
2074     /* Ask for a timing update before we cork/uncork to get the best
2075      * accuracy for the transport latency suitable for the
2076      * check_smoother_status() call in the started callback */
2077     request_auto_timing_update(s, TRUE);
2078
2079     s->corked = b;
2080
2081     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2082
2083     t = pa_tagstruct_command(
2084             s->context,
2085             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2086             &tag);
2087     pa_tagstruct_putu32(t, s->channel);
2088     pa_tagstruct_put_boolean(t, !!b);
2089     pa_pstream_send_tagstruct(s->context->pstream, t);
2090     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2091
2092     check_smoother_status(s, FALSE, FALSE, FALSE);
2093
2094     /* This might cause the indexes to hang/start again, hence let's
2095      * request a timing update, after the cork/uncork, too */
2096     request_auto_timing_update(s, TRUE);
2097
2098     return o;
2099 }
2100
2101 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2102     pa_tagstruct *t;
2103     pa_operation *o;
2104     uint32_t tag;
2105
2106     pa_assert(s);
2107     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2108
2109     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2110     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2111
2112     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2113
2114     t = pa_tagstruct_command(s->context, command, &tag);
2115     pa_tagstruct_putu32(t, s->channel);
2116     pa_pstream_send_tagstruct(s->context->pstream, t);
2117     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2118
2119     return o;
2120 }
2121
2122 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2123     pa_operation *o;
2124
2125     pa_assert(s);
2126     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2127
2128     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2129     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2130     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2131
2132     /* Ask for a timing update *before* the flush, so that the
2133      * transport usec is as up to date as possible when we get the
2134      * underflow message and update the smoother status*/
2135     request_auto_timing_update(s, TRUE);
2136
2137     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2138         return NULL;
2139
2140     if (s->direction == PA_STREAM_PLAYBACK) {
2141
2142         if (s->write_index_corrections[s->current_write_index_correction].valid)
2143             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2144
2145         if (s->buffer_attr.prebuf > 0)
2146             check_smoother_status(s, FALSE, FALSE, TRUE);
2147
2148         /* This will change the write index, but leave the
2149          * read index untouched. */
2150         invalidate_indexes(s, FALSE, TRUE);
2151
2152     } else
2153         /* For record streams this has no influence on the write
2154          * index, but the read index might jump. */
2155         invalidate_indexes(s, TRUE, FALSE);
2156
2157     return o;
2158 }
2159
2160 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2161     pa_operation *o;
2162
2163     pa_assert(s);
2164     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2165
2166     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2167     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2168     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2169     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2170
2171     /* Ask for a timing update before we cork/uncork to get the best
2172      * accuracy for the transport latency suitable for the
2173      * check_smoother_status() call in the started callback */
2174     request_auto_timing_update(s, TRUE);
2175
2176     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2177         return NULL;
2178
2179     /* This might cause the read index to hang again, hence
2180      * let's request a timing update */
2181     request_auto_timing_update(s, TRUE);
2182
2183     return o;
2184 }
2185
2186 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2187     pa_operation *o;
2188
2189     pa_assert(s);
2190     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2191
2192     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2193     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2194     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2195     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2196
2197     /* Ask for a timing update before we cork/uncork to get the best
2198      * accuracy for the transport latency suitable for the
2199      * check_smoother_status() call in the started callback */
2200     request_auto_timing_update(s, TRUE);
2201
2202     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2203         return NULL;
2204
2205     /* This might cause the read index to start moving again, hence
2206      * let's request a timing update */
2207     request_auto_timing_update(s, TRUE);
2208
2209     return o;
2210 }
2211
2212 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2213     pa_operation *o;
2214
2215     pa_assert(s);
2216     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2217     pa_assert(name);
2218
2219     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2220     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2221     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2222
2223     if (s->context->version >= 13) {
2224         pa_proplist *p = pa_proplist_new();
2225
2226         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2227         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2228         pa_proplist_free(p);
2229     } else {
2230         pa_tagstruct *t;
2231         uint32_t tag;
2232
2233         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2234         t = pa_tagstruct_command(
2235                 s->context,
2236                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2237                 &tag);
2238         pa_tagstruct_putu32(t, s->channel);
2239         pa_tagstruct_puts(t, name);
2240         pa_pstream_send_tagstruct(s->context->pstream, t);
2241         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2242     }
2243
2244     return o;
2245 }
2246
2247 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2248     pa_usec_t usec;
2249
2250     pa_assert(s);
2251     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2252
2253     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2254     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2255     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2256     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2257     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2258     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2259
2260     if (s->smoother)
2261         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2262     else
2263         usec = calc_time(s, FALSE);
2264
2265     /* Make sure the time runs monotonically */
2266     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2267         if (usec < s->previous_time)
2268             usec = s->previous_time;
2269         else
2270             s->previous_time = usec;
2271     }
2272
2273     if (r_usec)
2274         *r_usec = usec;
2275
2276     return 0;
2277 }
2278
2279 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2280     pa_assert(s);
2281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2282
2283     if (negative)
2284         *negative = 0;
2285
2286     if (a >= b)
2287         return a-b;
2288     else {
2289         if (negative && s->direction == PA_STREAM_RECORD) {
2290             *negative = 1;
2291             return b-a;
2292         } else
2293             return 0;
2294     }
2295 }
2296
2297 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2298     pa_usec_t t, c;
2299     int r;
2300     int64_t cindex;
2301
2302     pa_assert(s);
2303     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2304     pa_assert(r_usec);
2305
2306     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2307     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2308     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2309     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2310     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2311     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2312
2313     if ((r = pa_stream_get_time(s, &t)) < 0)
2314         return r;
2315
2316     if (s->direction == PA_STREAM_PLAYBACK)
2317         cindex = s->timing_info.write_index;
2318     else
2319         cindex = s->timing_info.read_index;
2320
2321     if (cindex < 0)
2322         cindex = 0;
2323
2324     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2325
2326     if (s->direction == PA_STREAM_PLAYBACK)
2327         *r_usec = time_counter_diff(s, c, t, negative);
2328     else
2329         *r_usec = time_counter_diff(s, t, c, negative);
2330
2331     return 0;
2332 }
2333
2334 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2335     pa_assert(s);
2336     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2337
2338     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2339     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2340     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2341     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2342
2343     return &s->timing_info;
2344 }
2345
2346 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2347     pa_assert(s);
2348     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2349
2350     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2351
2352     return &s->sample_spec;
2353 }
2354
2355 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2356     pa_assert(s);
2357     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2358
2359     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2360
2361     return &s->channel_map;
2362 }
2363
2364 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2365     pa_assert(s);
2366     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2367
2368     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2369     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2370     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2371
2372     return &s->buffer_attr;
2373 }
2374
2375 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2376     pa_operation *o = userdata;
2377     int success = 1;
2378
2379     pa_assert(pd);
2380     pa_assert(o);
2381     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2382
2383     if (!o->context)
2384         goto finish;
2385
2386     if (command != PA_COMMAND_REPLY) {
2387         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2388             goto finish;
2389
2390         success = 0;
2391     } else {
2392         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2393             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2394                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2395                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2396                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2397                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2398                 goto finish;
2399             }
2400         } else if (o->stream->direction == PA_STREAM_RECORD) {
2401             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2402                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2403                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2404                 goto finish;
2405             }
2406         }
2407
2408         if (o->stream->context->version >= 13) {
2409             pa_usec_t usec;
2410
2411             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2412                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2413                 goto finish;
2414             }
2415
2416             if (o->stream->direction == PA_STREAM_RECORD)
2417                 o->stream->timing_info.configured_source_usec = usec;
2418             else
2419                 o->stream->timing_info.configured_sink_usec = usec;
2420         }
2421
2422         if (!pa_tagstruct_eof(t)) {
2423             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2424             goto finish;
2425         }
2426     }
2427
2428     if (o->callback) {
2429         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2430         cb(o->stream, success, o->userdata);
2431     }
2432
2433 finish:
2434     pa_operation_done(o);
2435     pa_operation_unref(o);
2436 }
2437
2438
2439 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2440     pa_operation *o;
2441     pa_tagstruct *t;
2442     uint32_t tag;
2443     pa_buffer_attr copy;
2444
2445     pa_assert(s);
2446     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2447     pa_assert(attr);
2448
2449     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2450     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2451     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2452     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2453
2454     /* Ask for a timing update before we cork/uncork to get the best
2455      * accuracy for the transport latency suitable for the
2456      * check_smoother_status() call in the started callback */
2457     request_auto_timing_update(s, TRUE);
2458
2459     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2460
2461     t = pa_tagstruct_command(
2462             s->context,
2463             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2464             &tag);
2465     pa_tagstruct_putu32(t, s->channel);
2466
2467     copy = *attr;
2468     patch_buffer_attr(s, &copy, NULL);
2469     attr = &copy;
2470
2471     pa_tagstruct_putu32(t, attr->maxlength);
2472
2473     if (s->direction == PA_STREAM_PLAYBACK)
2474         pa_tagstruct_put(
2475                 t,
2476                 PA_TAG_U32, attr->tlength,
2477                 PA_TAG_U32, attr->prebuf,
2478                 PA_TAG_U32, attr->minreq,
2479                 PA_TAG_INVALID);
2480     else
2481         pa_tagstruct_putu32(t, attr->fragsize);
2482
2483     if (s->context->version >= 13)
2484         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2485
2486     if (s->context->version >= 14)
2487         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2488
2489     pa_pstream_send_tagstruct(s->context->pstream, t);
2490     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2491
2492     /* This might cause changes in the read/write indexex, hence let's
2493      * request a timing update */
2494     request_auto_timing_update(s, TRUE);
2495
2496     return o;
2497 }
2498
2499 uint32_t pa_stream_get_device_index(pa_stream *s) {
2500     pa_assert(s);
2501     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2502
2503     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2504     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2505     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2506     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2507     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2508
2509     return s->device_index;
2510 }
2511
2512 const char *pa_stream_get_device_name(pa_stream *s) {
2513     pa_assert(s);
2514     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2515
2516     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2517     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2518     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2519     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2520     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2521
2522     return s->device_name;
2523 }
2524
2525 int pa_stream_is_suspended(pa_stream *s) {
2526     pa_assert(s);
2527     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2528
2529     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2530     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2531     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2532     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2533
2534     return s->suspended;
2535 }
2536
2537 int pa_stream_is_corked(pa_stream *s) {
2538     pa_assert(s);
2539     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2540
2541     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2542     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2543     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2544
2545     return s->corked;
2546 }
2547
2548 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2549     pa_operation *o = userdata;
2550     int success = 1;
2551
2552     pa_assert(pd);
2553     pa_assert(o);
2554     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2555
2556     if (!o->context)
2557         goto finish;
2558
2559     if (command != PA_COMMAND_REPLY) {
2560         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2561             goto finish;
2562
2563         success = 0;
2564     } else {
2565
2566         if (!pa_tagstruct_eof(t)) {
2567             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2568             goto finish;
2569         }
2570     }
2571
2572     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2573     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2574
2575     if (o->callback) {
2576         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2577         cb(o->stream, success, o->userdata);
2578     }
2579
2580 finish:
2581     pa_operation_done(o);
2582     pa_operation_unref(o);
2583 }
2584
2585
2586 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2587     pa_operation *o;
2588     pa_tagstruct *t;
2589     uint32_t tag;
2590
2591     pa_assert(s);
2592     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2593
2594     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2595     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2596     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2597     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2598     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2599     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2600
2601     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2602     o->private = PA_UINT_TO_PTR(rate);
2603
2604     t = pa_tagstruct_command(
2605             s->context,
2606             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2607             &tag);
2608     pa_tagstruct_putu32(t, s->channel);
2609     pa_tagstruct_putu32(t, rate);
2610
2611     pa_pstream_send_tagstruct(s->context->pstream, t);
2612     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2613
2614     return o;
2615 }
2616
2617 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2618     pa_operation *o;
2619     pa_tagstruct *t;
2620     uint32_t tag;
2621
2622     pa_assert(s);
2623     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2624
2625     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2626     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2627     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2628     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2629     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2630
2631     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2632
2633     t = pa_tagstruct_command(
2634             s->context,
2635             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2636             &tag);
2637     pa_tagstruct_putu32(t, s->channel);
2638     pa_tagstruct_putu32(t, (uint32_t) mode);
2639     pa_tagstruct_put_proplist(t, p);
2640
2641     pa_pstream_send_tagstruct(s->context->pstream, t);
2642     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2643
2644     /* Please note that we don't update s->proplist here, because we
2645      * don't export that field */
2646
2647     return o;
2648 }
2649
2650 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2651     pa_operation *o;
2652     pa_tagstruct *t;
2653     uint32_t tag;
2654     const char * const*k;
2655
2656     pa_assert(s);
2657     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2658
2659     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2660     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2661     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2662     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2663     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2664
2665     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2666
2667     t = pa_tagstruct_command(
2668             s->context,
2669             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2670             &tag);
2671     pa_tagstruct_putu32(t, s->channel);
2672
2673     for (k = keys; *k; k++)
2674         pa_tagstruct_puts(t, *k);
2675
2676     pa_tagstruct_puts(t, NULL);
2677
2678     pa_pstream_send_tagstruct(s->context->pstream, t);
2679     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2680
2681     /* Please note that we don't update s->proplist here, because we
2682      * don't export that field */
2683
2684     return o;
2685 }
2686
2687 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2688     pa_assert(s);
2689     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2690
2691     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2692     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2693     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2694     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2695
2696     s->direct_on_input = sink_input_idx;
2697
2698     return 0;
2699 }
2700
2701 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2702     pa_assert(s);
2703     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2704
2705     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2706     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2707     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2708
2709     return s->direct_on_input;
2710 }