Merge remote branch 'mkbosmans/rate-adjustment'
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/stream.h>
33 #include <pulse/timeval.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "fork-detect.h"
45 #include "internal.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 pa_stream *pa_stream_new_with_proplist(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_proplist *p) {
89
90     pa_stream *s;
91     int i;
92     pa_channel_map tmap;
93
94     pa_assert(c);
95     pa_assert(PA_REFCNT_VALUE(c) >= 1);
96
97     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
102     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
103     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104
105     if (!map)
106         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
107
108     s = pa_xnew(pa_stream, 1);
109     PA_REFCNT_INIT(s);
110     s->context = c;
111     s->mainloop = c->mainloop;
112
113     s->direction = PA_STREAM_NODIRECTION;
114     s->state = PA_STREAM_UNCONNECTED;
115     s->flags = 0;
116
117     s->sample_spec = *ss;
118     s->channel_map = *map;
119
120     s->direct_on_input = PA_INVALID_INDEX;
121
122     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
123     if (name)
124         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
125
126     s->channel = 0;
127     s->channel_valid = FALSE;
128     s->syncid = c->csyncid++;
129     s->stream_index = PA_INVALID_INDEX;
130
131     s->requested_bytes = 0;
132     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
133
134     /* We initialize der target length here, so that if the user
135      * passes no explicit buffering metrics the default is similar to
136      * what older PA versions provided. */
137
138     s->buffer_attr.maxlength = (uint32_t) -1;
139     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
140     s->buffer_attr.minreq = (uint32_t) -1;
141     s->buffer_attr.prebuf = (uint32_t) -1;
142     s->buffer_attr.fragsize = (uint32_t) -1;
143
144     s->device_index = PA_INVALID_INDEX;
145     s->device_name = NULL;
146     s->suspended = FALSE;
147     s->corked = FALSE;
148
149     s->write_memblock = NULL;
150     s->write_data = NULL;
151
152     pa_memchunk_reset(&s->peek_memchunk);
153     s->peek_data = NULL;
154     s->record_memblockq = NULL;
155
156     memset(&s->timing_info, 0, sizeof(s->timing_info));
157     s->timing_info_valid = FALSE;
158
159     s->previous_time = 0;
160
161     s->read_index_not_before = 0;
162     s->write_index_not_before = 0;
163     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
164         s->write_index_corrections[i].valid = 0;
165     s->current_write_index_correction = 0;
166
167     s->auto_timing_update_event = NULL;
168     s->auto_timing_update_requested = FALSE;
169     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
170
171     reset_callbacks(s);
172
173     s->smoother = NULL;
174
175     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
176     PA_LLIST_PREPEND(pa_stream, c->streams, s);
177     pa_stream_ref(s);
178
179     return s;
180 }
181
182 static void stream_unlink(pa_stream *s) {
183     pa_operation *o, *n;
184     pa_assert(s);
185
186     if (!s->context)
187         return;
188
189     /* Detach from context */
190
191     /* Unref all operatio object that point to us */
192     for (o = s->context->operations; o; o = n) {
193         n = o->next;
194
195         if (o->stream == s)
196             pa_operation_cancel(o);
197     }
198
199     /* Drop all outstanding replies for this stream */
200     if (s->context->pdispatch)
201         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
202
203     if (s->channel_valid) {
204         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
205         s->channel = 0;
206         s->channel_valid = FALSE;
207     }
208
209     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
210     pa_stream_unref(s);
211
212     s->context = NULL;
213
214     if (s->auto_timing_update_event) {
215         pa_assert(s->mainloop);
216         s->mainloop->time_free(s->auto_timing_update_event);
217     }
218
219     reset_callbacks(s);
220 }
221
222 static void stream_free(pa_stream *s) {
223     pa_assert(s);
224
225     stream_unlink(s);
226
227     if (s->write_memblock) {
228         pa_memblock_release(s->write_memblock);
229         pa_memblock_unref(s->write_data);
230     }
231
232     if (s->peek_memchunk.memblock) {
233         if (s->peek_data)
234             pa_memblock_release(s->peek_memchunk.memblock);
235         pa_memblock_unref(s->peek_memchunk.memblock);
236     }
237
238     if (s->record_memblockq)
239         pa_memblockq_free(s->record_memblockq);
240
241     if (s->proplist)
242         pa_proplist_free(s->proplist);
243
244     if (s->smoother)
245         pa_smoother_free(s->smoother);
246
247     pa_xfree(s->device_name);
248     pa_xfree(s);
249 }
250
251 void pa_stream_unref(pa_stream *s) {
252     pa_assert(s);
253     pa_assert(PA_REFCNT_VALUE(s) >= 1);
254
255     if (PA_REFCNT_DEC(s) <= 0)
256         stream_free(s);
257 }
258
259 pa_stream* pa_stream_ref(pa_stream *s) {
260     pa_assert(s);
261     pa_assert(PA_REFCNT_VALUE(s) >= 1);
262
263     PA_REFCNT_INC(s);
264     return s;
265 }
266
267 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
268     pa_assert(s);
269     pa_assert(PA_REFCNT_VALUE(s) >= 1);
270
271     return s->state;
272 }
273
274 pa_context* pa_stream_get_context(pa_stream *s) {
275     pa_assert(s);
276     pa_assert(PA_REFCNT_VALUE(s) >= 1);
277
278     return s->context;
279 }
280
281 uint32_t pa_stream_get_index(pa_stream *s) {
282     pa_assert(s);
283     pa_assert(PA_REFCNT_VALUE(s) >= 1);
284
285     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
286     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
287
288     return s->stream_index;
289 }
290
291 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
292     pa_assert(s);
293     pa_assert(PA_REFCNT_VALUE(s) >= 1);
294
295     if (s->state == st)
296         return;
297
298     pa_stream_ref(s);
299
300     s->state = st;
301
302     if (s->state_callback)
303         s->state_callback(s, s->state_userdata);
304
305     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
306         stream_unlink(s);
307
308     pa_stream_unref(s);
309 }
310
311 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
312     pa_assert(s);
313     pa_assert(PA_REFCNT_VALUE(s) >= 1);
314
315     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
316         return;
317
318     if (s->state == PA_STREAM_READY &&
319         (force || !s->auto_timing_update_requested)) {
320         pa_operation *o;
321
322 /*         pa_log("Automatically requesting new timing data"); */
323
324         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
325             pa_operation_unref(o);
326             s->auto_timing_update_requested = TRUE;
327         }
328     }
329
330     if (s->auto_timing_update_event) {
331         if (force)
332             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
333
334         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
335
336         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
337     }
338 }
339
340 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
341     pa_context *c = userdata;
342     pa_stream *s;
343     uint32_t channel;
344
345     pa_assert(pd);
346     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
347     pa_assert(t);
348     pa_assert(c);
349     pa_assert(PA_REFCNT_VALUE(c) >= 1);
350
351     pa_context_ref(c);
352
353     if (pa_tagstruct_getu32(t, &channel) < 0 ||
354         !pa_tagstruct_eof(t)) {
355         pa_context_fail(c, PA_ERR_PROTOCOL);
356         goto finish;
357     }
358
359     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
360         goto finish;
361
362     if (s->state != PA_STREAM_READY)
363         goto finish;
364
365     pa_context_set_error(c, PA_ERR_KILLED);
366     pa_stream_set_state(s, PA_STREAM_FAILED);
367
368 finish:
369     pa_context_unref(c);
370 }
371
372 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
373     pa_usec_t x;
374
375     pa_assert(s);
376     pa_assert(!force_start || !force_stop);
377
378     if (!s->smoother)
379         return;
380
381     x = pa_rtclock_now();
382
383     if (s->timing_info_valid) {
384         if (aposteriori)
385             x -= s->timing_info.transport_usec;
386         else
387             x += s->timing_info.transport_usec;
388     }
389
390     if (s->suspended || s->corked || force_stop)
391         pa_smoother_pause(s->smoother, x);
392     else if (force_start || s->buffer_attr.prebuf == 0) {
393
394         if (!s->timing_info_valid &&
395             !aposteriori &&
396             !force_start &&
397             !force_stop &&
398             s->context->version >= 13) {
399
400             /* If the server supports STARTED events we take them as
401              * indications when audio really starts/stops playing, if
402              * we don't have any timing info yet -- instead of trying
403              * to be smart and guessing the server time. Otherwise the
404              * unknown transport delay add too much noise to our time
405              * calculations. */
406
407             return;
408         }
409
410         pa_smoother_resume(s->smoother, x, TRUE);
411     }
412
413     /* Please note that we have no idea if playback actually started
414      * if prebuf is non-zero! */
415 }
416
417 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
418     pa_context *c = userdata;
419     pa_stream *s;
420     uint32_t channel;
421     const char *dn;
422     pa_bool_t suspended;
423     uint32_t di;
424     pa_usec_t usec = 0;
425     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
426
427     pa_assert(pd);
428     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
429     pa_assert(t);
430     pa_assert(c);
431     pa_assert(PA_REFCNT_VALUE(c) >= 1);
432
433     pa_context_ref(c);
434
435     if (c->version < 12) {
436         pa_context_fail(c, PA_ERR_PROTOCOL);
437         goto finish;
438     }
439
440     if (pa_tagstruct_getu32(t, &channel) < 0 ||
441         pa_tagstruct_getu32(t, &di) < 0 ||
442         pa_tagstruct_gets(t, &dn) < 0 ||
443         pa_tagstruct_get_boolean(t, &suspended) < 0) {
444         pa_context_fail(c, PA_ERR_PROTOCOL);
445         goto finish;
446     }
447
448     if (c->version >= 13) {
449
450         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
451             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
452                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
453                 pa_tagstruct_get_usec(t, &usec) < 0) {
454                 pa_context_fail(c, PA_ERR_PROTOCOL);
455                 goto finish;
456             }
457         } else {
458             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
459                 pa_tagstruct_getu32(t, &tlength) < 0 ||
460                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
461                 pa_tagstruct_getu32(t, &minreq) < 0 ||
462                 pa_tagstruct_get_usec(t, &usec) < 0) {
463                 pa_context_fail(c, PA_ERR_PROTOCOL);
464                 goto finish;
465             }
466         }
467     }
468
469     if (!pa_tagstruct_eof(t)) {
470         pa_context_fail(c, PA_ERR_PROTOCOL);
471         goto finish;
472     }
473
474     if (!dn || di == PA_INVALID_INDEX) {
475         pa_context_fail(c, PA_ERR_PROTOCOL);
476         goto finish;
477     }
478
479     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
480         goto finish;
481
482     if (s->state != PA_STREAM_READY)
483         goto finish;
484
485     if (c->version >= 13) {
486         if (s->direction == PA_STREAM_RECORD)
487             s->timing_info.configured_source_usec = usec;
488         else
489             s->timing_info.configured_sink_usec = usec;
490
491         s->buffer_attr.maxlength = maxlength;
492         s->buffer_attr.fragsize = fragsize;
493         s->buffer_attr.tlength = tlength;
494         s->buffer_attr.prebuf = prebuf;
495         s->buffer_attr.minreq = minreq;
496     }
497
498     pa_xfree(s->device_name);
499     s->device_name = pa_xstrdup(dn);
500     s->device_index = di;
501
502     s->suspended = suspended;
503
504     check_smoother_status(s, TRUE, FALSE, FALSE);
505     request_auto_timing_update(s, TRUE);
506
507     if (s->moved_callback)
508         s->moved_callback(s, s->moved_userdata);
509
510 finish:
511     pa_context_unref(c);
512 }
513
514 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515     pa_context *c = userdata;
516     pa_stream *s;
517     uint32_t channel;
518     pa_usec_t usec = 0;
519     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
520
521     pa_assert(pd);
522     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
523     pa_assert(t);
524     pa_assert(c);
525     pa_assert(PA_REFCNT_VALUE(c) >= 1);
526
527     pa_context_ref(c);
528
529     if (c->version < 15) {
530         pa_context_fail(c, PA_ERR_PROTOCOL);
531         goto finish;
532     }
533
534     if (pa_tagstruct_getu32(t, &channel) < 0) {
535         pa_context_fail(c, PA_ERR_PROTOCOL);
536         goto finish;
537     }
538
539     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
540         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
541             pa_tagstruct_getu32(t, &fragsize) < 0 ||
542             pa_tagstruct_get_usec(t, &usec) < 0) {
543             pa_context_fail(c, PA_ERR_PROTOCOL);
544             goto finish;
545         }
546     } else {
547         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
548             pa_tagstruct_getu32(t, &tlength) < 0 ||
549             pa_tagstruct_getu32(t, &prebuf) < 0 ||
550             pa_tagstruct_getu32(t, &minreq) < 0 ||
551             pa_tagstruct_get_usec(t, &usec) < 0) {
552             pa_context_fail(c, PA_ERR_PROTOCOL);
553             goto finish;
554         }
555     }
556
557     if (!pa_tagstruct_eof(t)) {
558         pa_context_fail(c, PA_ERR_PROTOCOL);
559         goto finish;
560     }
561
562     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
563         goto finish;
564
565     if (s->state != PA_STREAM_READY)
566         goto finish;
567
568     if (s->direction == PA_STREAM_RECORD)
569         s->timing_info.configured_source_usec = usec;
570     else
571         s->timing_info.configured_sink_usec = usec;
572
573     s->buffer_attr.maxlength = maxlength;
574     s->buffer_attr.fragsize = fragsize;
575     s->buffer_attr.tlength = tlength;
576     s->buffer_attr.prebuf = prebuf;
577     s->buffer_attr.minreq = minreq;
578
579     request_auto_timing_update(s, TRUE);
580
581     if (s->buffer_attr_callback)
582         s->buffer_attr_callback(s, s->buffer_attr_userdata);
583
584 finish:
585     pa_context_unref(c);
586 }
587
588 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
589     pa_context *c = userdata;
590     pa_stream *s;
591     uint32_t channel;
592     pa_bool_t suspended;
593
594     pa_assert(pd);
595     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
596     pa_assert(t);
597     pa_assert(c);
598     pa_assert(PA_REFCNT_VALUE(c) >= 1);
599
600     pa_context_ref(c);
601
602     if (c->version < 12) {
603         pa_context_fail(c, PA_ERR_PROTOCOL);
604         goto finish;
605     }
606
607     if (pa_tagstruct_getu32(t, &channel) < 0 ||
608         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
609         !pa_tagstruct_eof(t)) {
610         pa_context_fail(c, PA_ERR_PROTOCOL);
611         goto finish;
612     }
613
614     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
615         goto finish;
616
617     if (s->state != PA_STREAM_READY)
618         goto finish;
619
620     s->suspended = suspended;
621
622     check_smoother_status(s, TRUE, FALSE, FALSE);
623     request_auto_timing_update(s, TRUE);
624
625     if (s->suspended_callback)
626         s->suspended_callback(s, s->suspended_userdata);
627
628 finish:
629     pa_context_unref(c);
630 }
631
632 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
633     pa_context *c = userdata;
634     pa_stream *s;
635     uint32_t channel;
636
637     pa_assert(pd);
638     pa_assert(command == PA_COMMAND_STARTED);
639     pa_assert(t);
640     pa_assert(c);
641     pa_assert(PA_REFCNT_VALUE(c) >= 1);
642
643     pa_context_ref(c);
644
645     if (c->version < 13) {
646         pa_context_fail(c, PA_ERR_PROTOCOL);
647         goto finish;
648     }
649
650     if (pa_tagstruct_getu32(t, &channel) < 0 ||
651         !pa_tagstruct_eof(t)) {
652         pa_context_fail(c, PA_ERR_PROTOCOL);
653         goto finish;
654     }
655
656     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
657         goto finish;
658
659     if (s->state != PA_STREAM_READY)
660         goto finish;
661
662     check_smoother_status(s, TRUE, TRUE, FALSE);
663     request_auto_timing_update(s, TRUE);
664
665     if (s->started_callback)
666         s->started_callback(s, s->started_userdata);
667
668 finish:
669     pa_context_unref(c);
670 }
671
672 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673     pa_context *c = userdata;
674     pa_stream *s;
675     uint32_t channel;
676     pa_proplist *pl = NULL;
677     const char *event;
678
679     pa_assert(pd);
680     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
681     pa_assert(t);
682     pa_assert(c);
683     pa_assert(PA_REFCNT_VALUE(c) >= 1);
684
685     pa_context_ref(c);
686
687     if (c->version < 15) {
688         pa_context_fail(c, PA_ERR_PROTOCOL);
689         goto finish;
690     }
691
692     pl = pa_proplist_new();
693
694     if (pa_tagstruct_getu32(t, &channel) < 0 ||
695         pa_tagstruct_gets(t, &event) < 0 ||
696         pa_tagstruct_get_proplist(t, pl) < 0 ||
697         !pa_tagstruct_eof(t) || !event) {
698         pa_context_fail(c, PA_ERR_PROTOCOL);
699         goto finish;
700     }
701
702     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
703         goto finish;
704
705     if (s->state != PA_STREAM_READY)
706         goto finish;
707
708     if (s->event_callback)
709         s->event_callback(s, event, pl, s->event_userdata);
710
711 finish:
712     pa_context_unref(c);
713
714     if (pl)
715         pa_proplist_free(pl);
716 }
717
718 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
719     pa_stream *s;
720     pa_context *c = userdata;
721     uint32_t bytes, channel;
722
723     pa_assert(pd);
724     pa_assert(command == PA_COMMAND_REQUEST);
725     pa_assert(t);
726     pa_assert(c);
727     pa_assert(PA_REFCNT_VALUE(c) >= 1);
728
729     pa_context_ref(c);
730
731     if (pa_tagstruct_getu32(t, &channel) < 0 ||
732         pa_tagstruct_getu32(t, &bytes) < 0 ||
733         !pa_tagstruct_eof(t)) {
734         pa_context_fail(c, PA_ERR_PROTOCOL);
735         goto finish;
736     }
737
738     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
739         goto finish;
740
741     if (s->state != PA_STREAM_READY)
742         goto finish;
743
744     s->requested_bytes += bytes;
745
746     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
747
748     if (s->requested_bytes > 0 && s->write_callback)
749         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
750
751 finish:
752     pa_context_unref(c);
753 }
754
755 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
756     pa_stream *s;
757     pa_context *c = userdata;
758     uint32_t channel;
759
760     pa_assert(pd);
761     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
762     pa_assert(t);
763     pa_assert(c);
764     pa_assert(PA_REFCNT_VALUE(c) >= 1);
765
766     pa_context_ref(c);
767
768     if (pa_tagstruct_getu32(t, &channel) < 0 ||
769         !pa_tagstruct_eof(t)) {
770         pa_context_fail(c, PA_ERR_PROTOCOL);
771         goto finish;
772     }
773
774     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
775         goto finish;
776
777     if (s->state != PA_STREAM_READY)
778         goto finish;
779
780     if (s->buffer_attr.prebuf > 0)
781         check_smoother_status(s, TRUE, FALSE, TRUE);
782
783     request_auto_timing_update(s, TRUE);
784
785     if (command == PA_COMMAND_OVERFLOW) {
786         if (s->overflow_callback)
787             s->overflow_callback(s, s->overflow_userdata);
788     } else if (command == PA_COMMAND_UNDERFLOW) {
789         if (s->underflow_callback)
790             s->underflow_callback(s, s->underflow_userdata);
791     }
792
793  finish:
794     pa_context_unref(c);
795 }
796
797 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
798     pa_assert(s);
799     pa_assert(PA_REFCNT_VALUE(s) >= 1);
800
801 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
802
803     if (s->state != PA_STREAM_READY)
804         return;
805
806     if (w) {
807         s->write_index_not_before = s->context->ctag;
808
809         if (s->timing_info_valid)
810             s->timing_info.write_index_corrupt = TRUE;
811
812 /*         pa_log("write_index invalidated"); */
813     }
814
815     if (r) {
816         s->read_index_not_before = s->context->ctag;
817
818         if (s->timing_info_valid)
819             s->timing_info.read_index_corrupt = TRUE;
820
821 /*         pa_log("read_index invalidated"); */
822     }
823
824     request_auto_timing_update(s, TRUE);
825 }
826
827 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
828     pa_stream *s = userdata;
829
830     pa_assert(s);
831     pa_assert(PA_REFCNT_VALUE(s) >= 1);
832
833     pa_stream_ref(s);
834     request_auto_timing_update(s, FALSE);
835     pa_stream_unref(s);
836 }
837
838 static void create_stream_complete(pa_stream *s) {
839     pa_assert(s);
840     pa_assert(PA_REFCNT_VALUE(s) >= 1);
841     pa_assert(s->state == PA_STREAM_CREATING);
842
843     pa_stream_set_state(s, PA_STREAM_READY);
844
845     if (s->requested_bytes > 0 && s->write_callback)
846         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
847
848     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
849         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
850         pa_assert(!s->auto_timing_update_event);
851         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
852
853         request_auto_timing_update(s, TRUE);
854     }
855
856     check_smoother_status(s, TRUE, FALSE, FALSE);
857 }
858
859 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
860     const char *e;
861
862     pa_assert(s);
863     pa_assert(attr);
864
865     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
866         uint32_t ms;
867
868         if (pa_atou(e, &ms) < 0 || ms <= 0)
869             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
870         else {
871             attr->maxlength = (uint32_t) -1;
872             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
873             attr->minreq = (uint32_t) -1;
874             attr->prebuf = (uint32_t) -1;
875             attr->fragsize = attr->tlength;
876         }
877
878         if (flags)
879             *flags |= PA_STREAM_ADJUST_LATENCY;
880     }
881
882     if (s->context->version >= 13)
883         return;
884
885     /* Version older than 0.9.10 didn't do server side buffer_attr
886      * selection, hence we have to fake it on the client side. */
887
888     /* We choose fairly conservative values here, to not confuse
889      * old clients with extremely large playback buffers */
890
891     if (attr->maxlength == (uint32_t) -1)
892         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
893
894     if (attr->tlength == (uint32_t) -1)
895         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
896
897     if (attr->minreq == (uint32_t) -1)
898         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
899
900     if (attr->prebuf == (uint32_t) -1)
901         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
902
903     if (attr->fragsize == (uint32_t) -1)
904         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
905 }
906
907 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
908     pa_stream *s = userdata;
909     uint32_t requested_bytes = 0;
910
911     pa_assert(pd);
912     pa_assert(s);
913     pa_assert(PA_REFCNT_VALUE(s) >= 1);
914     pa_assert(s->state == PA_STREAM_CREATING);
915
916     pa_stream_ref(s);
917
918     if (command != PA_COMMAND_REPLY) {
919         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
920             goto finish;
921
922         pa_stream_set_state(s, PA_STREAM_FAILED);
923         goto finish;
924     }
925
926     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
927         s->channel == PA_INVALID_INDEX ||
928         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
929         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
930         pa_context_fail(s->context, PA_ERR_PROTOCOL);
931         goto finish;
932     }
933
934     s->requested_bytes = (int64_t) requested_bytes;
935
936     if (s->context->version >= 9) {
937         if (s->direction == PA_STREAM_PLAYBACK) {
938             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
939                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
940                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
941                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
942                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
943                 goto finish;
944             }
945         } else if (s->direction == PA_STREAM_RECORD) {
946             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
947                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
948                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
949                 goto finish;
950             }
951         }
952     }
953
954     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
955         pa_sample_spec ss;
956         pa_channel_map cm;
957         const char *dn = NULL;
958         pa_bool_t suspended;
959
960         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
961             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
962             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
963             pa_tagstruct_gets(t, &dn) < 0 ||
964             pa_tagstruct_get_boolean(t, &suspended) < 0) {
965             pa_context_fail(s->context, PA_ERR_PROTOCOL);
966             goto finish;
967         }
968
969         if (!dn || s->device_index == PA_INVALID_INDEX ||
970             ss.channels != cm.channels ||
971             !pa_channel_map_valid(&cm) ||
972             !pa_sample_spec_valid(&ss) ||
973             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
974             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
975             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
976             pa_context_fail(s->context, PA_ERR_PROTOCOL);
977             goto finish;
978         }
979
980         pa_xfree(s->device_name);
981         s->device_name = pa_xstrdup(dn);
982         s->suspended = suspended;
983
984         s->channel_map = cm;
985         s->sample_spec = ss;
986     }
987
988     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
989         pa_usec_t usec;
990
991         if (pa_tagstruct_get_usec(t, &usec) < 0) {
992             pa_context_fail(s->context, PA_ERR_PROTOCOL);
993             goto finish;
994         }
995
996         if (s->direction == PA_STREAM_RECORD)
997             s->timing_info.configured_source_usec = usec;
998         else
999             s->timing_info.configured_sink_usec = usec;
1000     }
1001
1002     if (!pa_tagstruct_eof(t)) {
1003         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1004         goto finish;
1005     }
1006
1007     if (s->direction == PA_STREAM_RECORD) {
1008         pa_assert(!s->record_memblockq);
1009
1010         s->record_memblockq = pa_memblockq_new(
1011                 0,
1012                 s->buffer_attr.maxlength,
1013                 0,
1014                 pa_frame_size(&s->sample_spec),
1015                 1,
1016                 0,
1017                 0,
1018                 NULL);
1019     }
1020
1021     s->channel_valid = TRUE;
1022     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1023
1024     create_stream_complete(s);
1025
1026 finish:
1027     pa_stream_unref(s);
1028 }
1029
1030 static int create_stream(
1031         pa_stream_direction_t direction,
1032         pa_stream *s,
1033         const char *dev,
1034         const pa_buffer_attr *attr,
1035         pa_stream_flags_t flags,
1036         const pa_cvolume *volume,
1037         pa_stream *sync_stream) {
1038
1039     pa_tagstruct *t;
1040     uint32_t tag;
1041     pa_bool_t volume_set = FALSE;
1042
1043     pa_assert(s);
1044     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1045     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1046
1047     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1048     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1049     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1050     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1051                                               PA_STREAM_INTERPOLATE_TIMING|
1052                                               PA_STREAM_NOT_MONOTONIC|
1053                                               PA_STREAM_AUTO_TIMING_UPDATE|
1054                                               PA_STREAM_NO_REMAP_CHANNELS|
1055                                               PA_STREAM_NO_REMIX_CHANNELS|
1056                                               PA_STREAM_FIX_FORMAT|
1057                                               PA_STREAM_FIX_RATE|
1058                                               PA_STREAM_FIX_CHANNELS|
1059                                               PA_STREAM_DONT_MOVE|
1060                                               PA_STREAM_VARIABLE_RATE|
1061                                               PA_STREAM_PEAK_DETECT|
1062                                               PA_STREAM_START_MUTED|
1063                                               PA_STREAM_ADJUST_LATENCY|
1064                                               PA_STREAM_EARLY_REQUESTS|
1065                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1066                                               PA_STREAM_START_UNMUTED|
1067                                               PA_STREAM_FAIL_ON_SUSPEND|
1068                                               PA_STREAM_RELATIVE_VOLUME|
1069                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1070
1071
1072     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1073     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1074     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1075     /* Althought some of the other flags are not supported on older
1076      * version, we don't check for them here, because it doesn't hurt
1077      * when they are passed but actually not supported. This makes
1078      * client development easier */
1079
1080     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1081     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1082     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1083     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1084     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1085
1086     pa_stream_ref(s);
1087
1088     s->direction = direction;
1089
1090     if (sync_stream)
1091         s->syncid = sync_stream->syncid;
1092
1093     if (attr)
1094         s->buffer_attr = *attr;
1095     patch_buffer_attr(s, &s->buffer_attr, &flags);
1096
1097     s->flags = flags;
1098     s->corked = !!(flags & PA_STREAM_START_CORKED);
1099
1100     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1101         pa_usec_t x;
1102
1103         x = pa_rtclock_now();
1104
1105         pa_assert(!s->smoother);
1106         s->smoother = pa_smoother_new(
1107                 SMOOTHER_ADJUST_TIME,
1108                 SMOOTHER_HISTORY_TIME,
1109                 !(flags & PA_STREAM_NOT_MONOTONIC),
1110                 TRUE,
1111                 SMOOTHER_MIN_HISTORY,
1112                 x,
1113                 TRUE);
1114     }
1115
1116     if (!dev)
1117         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1118
1119     t = pa_tagstruct_command(
1120             s->context,
1121             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1122             &tag);
1123
1124     if (s->context->version < 13)
1125         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1126
1127     pa_tagstruct_put(
1128             t,
1129             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1130             PA_TAG_CHANNEL_MAP, &s->channel_map,
1131             PA_TAG_U32, PA_INVALID_INDEX,
1132             PA_TAG_STRING, dev,
1133             PA_TAG_U32, s->buffer_attr.maxlength,
1134             PA_TAG_BOOLEAN, s->corked,
1135             PA_TAG_INVALID);
1136
1137     if (s->direction == PA_STREAM_PLAYBACK) {
1138         pa_cvolume cv;
1139
1140         pa_tagstruct_put(
1141                 t,
1142                 PA_TAG_U32, s->buffer_attr.tlength,
1143                 PA_TAG_U32, s->buffer_attr.prebuf,
1144                 PA_TAG_U32, s->buffer_attr.minreq,
1145                 PA_TAG_U32, s->syncid,
1146                 PA_TAG_INVALID);
1147
1148         volume_set = !!volume;
1149
1150         if (!volume)
1151             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1152
1153         pa_tagstruct_put_cvolume(t, volume);
1154     } else
1155         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1156
1157     if (s->context->version >= 12) {
1158         pa_tagstruct_put(
1159                 t,
1160                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1161                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1162                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1163                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1164                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1165                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1166                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1167                 PA_TAG_INVALID);
1168     }
1169
1170     if (s->context->version >= 13) {
1171
1172         if (s->direction == PA_STREAM_PLAYBACK)
1173             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1174         else
1175             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1176
1177         pa_tagstruct_put(
1178                 t,
1179                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1180                 PA_TAG_PROPLIST, s->proplist,
1181                 PA_TAG_INVALID);
1182
1183         if (s->direction == PA_STREAM_RECORD)
1184             pa_tagstruct_putu32(t, s->direct_on_input);
1185     }
1186
1187     if (s->context->version >= 14) {
1188
1189         if (s->direction == PA_STREAM_PLAYBACK)
1190             pa_tagstruct_put_boolean(t, volume_set);
1191
1192         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1193     }
1194
1195     if (s->context->version >= 15) {
1196
1197         if (s->direction == PA_STREAM_PLAYBACK)
1198             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1199
1200         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1201         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1202     }
1203
1204     if (s->context->version >= 17) {
1205
1206         if (s->direction == PA_STREAM_PLAYBACK)
1207             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1208
1209     }
1210
1211     if (s->context->version >= 18) {
1212
1213         if (s->direction == PA_STREAM_PLAYBACK)
1214             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1215     }
1216
1217     pa_pstream_send_tagstruct(s->context->pstream, t);
1218     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1219
1220     pa_stream_set_state(s, PA_STREAM_CREATING);
1221
1222     pa_stream_unref(s);
1223     return 0;
1224 }
1225
1226 int pa_stream_connect_playback(
1227         pa_stream *s,
1228         const char *dev,
1229         const pa_buffer_attr *attr,
1230         pa_stream_flags_t flags,
1231         const pa_cvolume *volume,
1232         pa_stream *sync_stream) {
1233
1234     pa_assert(s);
1235     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1236
1237     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1238 }
1239
1240 int pa_stream_connect_record(
1241         pa_stream *s,
1242         const char *dev,
1243         const pa_buffer_attr *attr,
1244         pa_stream_flags_t flags) {
1245
1246     pa_assert(s);
1247     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1248
1249     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1250 }
1251
1252 int pa_stream_begin_write(
1253         pa_stream *s,
1254         void **data,
1255         size_t *nbytes) {
1256
1257     pa_assert(s);
1258     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1259
1260     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1261     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1262     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1263     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1264     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1265
1266     if (*nbytes != (size_t) -1) {
1267         size_t m, fs;
1268
1269         m = pa_mempool_block_size_max(s->context->mempool);
1270         fs = pa_frame_size(&s->sample_spec);
1271
1272         m = (m / fs) * fs;
1273         if (*nbytes > m)
1274             *nbytes = m;
1275     }
1276
1277     if (!s->write_memblock) {
1278         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1279         s->write_data = pa_memblock_acquire(s->write_memblock);
1280     }
1281
1282     *data = s->write_data;
1283     *nbytes = pa_memblock_get_length(s->write_memblock);
1284
1285     return 0;
1286 }
1287
1288 int pa_stream_cancel_write(
1289         pa_stream *s) {
1290
1291     pa_assert(s);
1292     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1293
1294     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1295     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1296     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1297     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1298
1299     pa_assert(s->write_data);
1300
1301     pa_memblock_release(s->write_memblock);
1302     pa_memblock_unref(s->write_memblock);
1303     s->write_memblock = NULL;
1304     s->write_data = NULL;
1305
1306     return 0;
1307 }
1308
1309 int pa_stream_write(
1310         pa_stream *s,
1311         const void *data,
1312         size_t length,
1313         pa_free_cb_t free_cb,
1314         int64_t offset,
1315         pa_seek_mode_t seek) {
1316
1317     pa_assert(s);
1318     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1319     pa_assert(data);
1320
1321     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1322     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1323     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1324     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1325     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1326     PA_CHECK_VALIDITY(s->context,
1327                       !s->write_memblock ||
1328                       ((data >= s->write_data) &&
1329                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1330                       PA_ERR_INVALID);
1331     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1332
1333     if (s->write_memblock) {
1334         pa_memchunk chunk;
1335
1336         /* pa_stream_write_begin() was called before */
1337
1338         pa_memblock_release(s->write_memblock);
1339
1340         chunk.memblock = s->write_memblock;
1341         chunk.index = (const char *) data - (const char *) s->write_data;
1342         chunk.length = length;
1343
1344         s->write_memblock = NULL;
1345         s->write_data = NULL;
1346
1347         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1348         pa_memblock_unref(chunk.memblock);
1349
1350     } else {
1351         pa_seek_mode_t t_seek = seek;
1352         int64_t t_offset = offset;
1353         size_t t_length = length;
1354         const void *t_data = data;
1355
1356         /* pa_stream_write_begin() was not called before */
1357
1358         while (t_length > 0) {
1359             pa_memchunk chunk;
1360
1361             chunk.index = 0;
1362
1363             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1364                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1365                 chunk.length = t_length;
1366             } else {
1367                 void *d;
1368
1369                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1370                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1371
1372                 d = pa_memblock_acquire(chunk.memblock);
1373                 memcpy(d, t_data, chunk.length);
1374                 pa_memblock_release(chunk.memblock);
1375             }
1376
1377             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1378
1379             t_offset = 0;
1380             t_seek = PA_SEEK_RELATIVE;
1381
1382             t_data = (const uint8_t*) t_data + chunk.length;
1383             t_length -= chunk.length;
1384
1385             pa_memblock_unref(chunk.memblock);
1386         }
1387
1388         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1389             free_cb((void*) data);
1390     }
1391
1392     /* This is obviously wrong since we ignore the seeking index . But
1393      * that's OK, the server side applies the same error */
1394     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1395
1396     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1397
1398     if (s->direction == PA_STREAM_PLAYBACK) {
1399
1400         /* Update latency request correction */
1401         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1402
1403             if (seek == PA_SEEK_ABSOLUTE) {
1404                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1405                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1406                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1407             } else if (seek == PA_SEEK_RELATIVE) {
1408                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1409                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1410             } else
1411                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1412         }
1413
1414         /* Update the write index in the already available latency data */
1415         if (s->timing_info_valid) {
1416
1417             if (seek == PA_SEEK_ABSOLUTE) {
1418                 s->timing_info.write_index_corrupt = FALSE;
1419                 s->timing_info.write_index = offset + (int64_t) length;
1420             } else if (seek == PA_SEEK_RELATIVE) {
1421                 if (!s->timing_info.write_index_corrupt)
1422                     s->timing_info.write_index += offset + (int64_t) length;
1423             } else
1424                 s->timing_info.write_index_corrupt = TRUE;
1425         }
1426
1427         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1428             request_auto_timing_update(s, TRUE);
1429     }
1430
1431     return 0;
1432 }
1433
1434 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1435     pa_assert(s);
1436     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1437     pa_assert(data);
1438     pa_assert(length);
1439
1440     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1441     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1442     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1443
1444     if (!s->peek_memchunk.memblock) {
1445
1446         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1447             *data = NULL;
1448             *length = 0;
1449             return 0;
1450         }
1451
1452         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1453     }
1454
1455     pa_assert(s->peek_data);
1456     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1457     *length = s->peek_memchunk.length;
1458     return 0;
1459 }
1460
1461 int pa_stream_drop(pa_stream *s) {
1462     pa_assert(s);
1463     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1464
1465     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1466     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1467     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1468     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1469
1470     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1471
1472     /* Fix the simulated local read index */
1473     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1474         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1475
1476     pa_assert(s->peek_data);
1477     pa_memblock_release(s->peek_memchunk.memblock);
1478     pa_memblock_unref(s->peek_memchunk.memblock);
1479     pa_memchunk_reset(&s->peek_memchunk);
1480
1481     return 0;
1482 }
1483
1484 size_t pa_stream_writable_size(pa_stream *s) {
1485     pa_assert(s);
1486     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1487
1488     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1489     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1490     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1491
1492     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1493 }
1494
1495 size_t pa_stream_readable_size(pa_stream *s) {
1496     pa_assert(s);
1497     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1498
1499     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1500     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1501     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1502
1503     return pa_memblockq_get_length(s->record_memblockq);
1504 }
1505
1506 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1507     pa_operation *o;
1508     pa_tagstruct *t;
1509     uint32_t tag;
1510
1511     pa_assert(s);
1512     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1513
1514     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1515     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1516     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1517
1518     /* Ask for a timing update before we cork/uncork to get the best
1519      * accuracy for the transport latency suitable for the
1520      * check_smoother_status() call in the started callback */
1521     request_auto_timing_update(s, TRUE);
1522
1523     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1524
1525     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1526     pa_tagstruct_putu32(t, s->channel);
1527     pa_pstream_send_tagstruct(s->context->pstream, t);
1528     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1529
1530     /* This might cause the read index to conitnue again, hence
1531      * let's request a timing update */
1532     request_auto_timing_update(s, TRUE);
1533
1534     return o;
1535 }
1536
1537 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1538     pa_usec_t usec;
1539
1540     pa_assert(s);
1541     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1542     pa_assert(s->state == PA_STREAM_READY);
1543     pa_assert(s->direction != PA_STREAM_UPLOAD);
1544     pa_assert(s->timing_info_valid);
1545     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1546     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1547
1548     if (s->direction == PA_STREAM_PLAYBACK) {
1549         /* The last byte that was written into the output device
1550          * had this time value associated */
1551         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1552
1553         if (!s->corked && !s->suspended) {
1554
1555             if (!ignore_transport)
1556                 /* Because the latency info took a little time to come
1557                  * to us, we assume that the real output time is actually
1558                  * a little ahead */
1559                 usec += s->timing_info.transport_usec;
1560
1561             /* However, the output device usually maintains a buffer
1562                too, hence the real sample currently played is a little
1563                back  */
1564             if (s->timing_info.sink_usec >= usec)
1565                 usec = 0;
1566             else
1567                 usec -= s->timing_info.sink_usec;
1568         }
1569
1570     } else {
1571         pa_assert(s->direction == PA_STREAM_RECORD);
1572
1573         /* The last byte written into the server side queue had
1574          * this time value associated */
1575         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1576
1577         if (!s->corked && !s->suspended) {
1578
1579             if (!ignore_transport)
1580                 /* Add transport latency */
1581                 usec += s->timing_info.transport_usec;
1582
1583             /* Add latency of data in device buffer */
1584             usec += s->timing_info.source_usec;
1585
1586             /* If this is a monitor source, we need to correct the
1587              * time by the playback device buffer */
1588             if (s->timing_info.sink_usec >= usec)
1589                 usec = 0;
1590             else
1591                 usec -= s->timing_info.sink_usec;
1592         }
1593     }
1594
1595     return usec;
1596 }
1597
1598 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1599     pa_operation *o = userdata;
1600     struct timeval local, remote, now;
1601     pa_timing_info *i;
1602     pa_bool_t playing = FALSE;
1603     uint64_t underrun_for = 0, playing_for = 0;
1604
1605     pa_assert(pd);
1606     pa_assert(o);
1607     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1608
1609     if (!o->context || !o->stream)
1610         goto finish;
1611
1612     i = &o->stream->timing_info;
1613
1614     o->stream->timing_info_valid = FALSE;
1615     i->write_index_corrupt = TRUE;
1616     i->read_index_corrupt = TRUE;
1617
1618     if (command != PA_COMMAND_REPLY) {
1619         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1620             goto finish;
1621
1622     } else {
1623
1624         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1625             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1626             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1627             pa_tagstruct_get_timeval(t, &local) < 0 ||
1628             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1629             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1630             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1631
1632             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1633             goto finish;
1634         }
1635
1636         if (o->context->version >= 13 &&
1637             o->stream->direction == PA_STREAM_PLAYBACK)
1638             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1639                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1640
1641                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1642                 goto finish;
1643             }
1644
1645
1646         if (!pa_tagstruct_eof(t)) {
1647             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1648             goto finish;
1649         }
1650         o->stream->timing_info_valid = TRUE;
1651         i->write_index_corrupt = FALSE;
1652         i->read_index_corrupt = FALSE;
1653
1654         i->playing = (int) playing;
1655         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1656
1657         pa_gettimeofday(&now);
1658
1659         /* Calculcate timestamps */
1660         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1661             /* local and remote seem to have synchronized clocks */
1662
1663             if (o->stream->direction == PA_STREAM_PLAYBACK)
1664                 i->transport_usec = pa_timeval_diff(&remote, &local);
1665             else
1666                 i->transport_usec = pa_timeval_diff(&now, &remote);
1667
1668             i->synchronized_clocks = TRUE;
1669             i->timestamp = remote;
1670         } else {
1671             /* clocks are not synchronized, let's estimate latency then */
1672             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1673             i->synchronized_clocks = FALSE;
1674             i->timestamp = local;
1675             pa_timeval_add(&i->timestamp, i->transport_usec);
1676         }
1677
1678         /* Invalidate read and write indexes if necessary */
1679         if (tag < o->stream->read_index_not_before)
1680             i->read_index_corrupt = TRUE;
1681
1682         if (tag < o->stream->write_index_not_before)
1683             i->write_index_corrupt = TRUE;
1684
1685         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1686             /* Write index correction */
1687
1688             int n, j;
1689             uint32_t ctag = tag;
1690
1691             /* Go through the saved correction values and add up the
1692              * total correction.*/
1693             for (n = 0, j = o->stream->current_write_index_correction+1;
1694                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1695                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1696
1697                 /* Step over invalid data or out-of-date data */
1698                 if (!o->stream->write_index_corrections[j].valid ||
1699                     o->stream->write_index_corrections[j].tag < ctag)
1700                     continue;
1701
1702                 /* Make sure that everything is in order */
1703                 ctag = o->stream->write_index_corrections[j].tag+1;
1704
1705                 /* Now fix the write index */
1706                 if (o->stream->write_index_corrections[j].corrupt) {
1707                     /* A corrupting seek was made */
1708                     i->write_index_corrupt = TRUE;
1709                 } else if (o->stream->write_index_corrections[j].absolute) {
1710                     /* An absolute seek was made */
1711                     i->write_index = o->stream->write_index_corrections[j].value;
1712                     i->write_index_corrupt = FALSE;
1713                 } else if (!i->write_index_corrupt) {
1714                     /* A relative seek was made */
1715                     i->write_index += o->stream->write_index_corrections[j].value;
1716                 }
1717             }
1718
1719             /* Clear old correction entries */
1720             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1721                 if (!o->stream->write_index_corrections[n].valid)
1722                     continue;
1723
1724                 if (o->stream->write_index_corrections[n].tag <= tag)
1725                     o->stream->write_index_corrections[n].valid = FALSE;
1726             }
1727         }
1728
1729         if (o->stream->direction == PA_STREAM_RECORD) {
1730             /* Read index correction */
1731
1732             if (!i->read_index_corrupt)
1733                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1734         }
1735
1736         /* Update smoother if we're not corked */
1737         if (o->stream->smoother && !o->stream->corked) {
1738             pa_usec_t u, x;
1739
1740             u = x = pa_rtclock_now() - i->transport_usec;
1741
1742             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1743                 pa_usec_t su;
1744
1745                 /* If we weren't playing then it will take some time
1746                  * until the audio will actually come out through the
1747                  * speakers. Since we follow that timing here, we need
1748                  * to try to fix this up */
1749
1750                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1751
1752                 if (su < i->sink_usec)
1753                     x += i->sink_usec - su;
1754             }
1755
1756             if (!i->playing)
1757                 pa_smoother_pause(o->stream->smoother, x);
1758
1759             /* Update the smoother */
1760             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1761                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1762                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1763
1764             if (i->playing)
1765                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1766         }
1767     }
1768
1769     o->stream->auto_timing_update_requested = FALSE;
1770
1771     if (o->stream->latency_update_callback)
1772         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1773
1774     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1775         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1776         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1777     }
1778
1779 finish:
1780
1781     pa_operation_done(o);
1782     pa_operation_unref(o);
1783 }
1784
1785 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1786     uint32_t tag;
1787     pa_operation *o;
1788     pa_tagstruct *t;
1789     struct timeval now;
1790     int cidx = 0;
1791
1792     pa_assert(s);
1793     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1794
1795     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1796     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1797     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1798
1799     if (s->direction == PA_STREAM_PLAYBACK) {
1800         /* Find a place to store the write_index correction data for this entry */
1801         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1802
1803         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1804         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1805     }
1806     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1807
1808     t = pa_tagstruct_command(
1809             s->context,
1810             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1811             &tag);
1812     pa_tagstruct_putu32(t, s->channel);
1813     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1814
1815     pa_pstream_send_tagstruct(s->context->pstream, t);
1816     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1817
1818     if (s->direction == PA_STREAM_PLAYBACK) {
1819         /* Fill in initial correction data */
1820
1821         s->current_write_index_correction = cidx;
1822
1823         s->write_index_corrections[cidx].valid = TRUE;
1824         s->write_index_corrections[cidx].absolute = FALSE;
1825         s->write_index_corrections[cidx].corrupt = FALSE;
1826         s->write_index_corrections[cidx].tag = tag;
1827         s->write_index_corrections[cidx].value = 0;
1828     }
1829
1830     return o;
1831 }
1832
1833 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1834     pa_stream *s = userdata;
1835
1836     pa_assert(pd);
1837     pa_assert(s);
1838     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1839
1840     pa_stream_ref(s);
1841
1842     if (command != PA_COMMAND_REPLY) {
1843         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1844             goto finish;
1845
1846         pa_stream_set_state(s, PA_STREAM_FAILED);
1847         goto finish;
1848     } else if (!pa_tagstruct_eof(t)) {
1849         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1850         goto finish;
1851     }
1852
1853     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1854
1855 finish:
1856     pa_stream_unref(s);
1857 }
1858
1859 int pa_stream_disconnect(pa_stream *s) {
1860     pa_tagstruct *t;
1861     uint32_t tag;
1862
1863     pa_assert(s);
1864     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1865
1866     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1867     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1868     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1869
1870     pa_stream_ref(s);
1871
1872     t = pa_tagstruct_command(
1873             s->context,
1874             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1875                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1876             &tag);
1877     pa_tagstruct_putu32(t, s->channel);
1878     pa_pstream_send_tagstruct(s->context->pstream, t);
1879     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1880
1881     pa_stream_unref(s);
1882     return 0;
1883 }
1884
1885 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1886     pa_assert(s);
1887     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1888
1889     if (pa_detect_fork())
1890         return;
1891
1892     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1893         return;
1894
1895     s->read_callback = cb;
1896     s->read_userdata = userdata;
1897 }
1898
1899 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1900     pa_assert(s);
1901     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1902
1903     if (pa_detect_fork())
1904         return;
1905
1906     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1907         return;
1908
1909     s->write_callback = cb;
1910     s->write_userdata = userdata;
1911 }
1912
1913 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1914     pa_assert(s);
1915     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1916
1917     if (pa_detect_fork())
1918         return;
1919
1920     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1921         return;
1922
1923     s->state_callback = cb;
1924     s->state_userdata = userdata;
1925 }
1926
1927 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1928     pa_assert(s);
1929     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1930
1931     if (pa_detect_fork())
1932         return;
1933
1934     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1935         return;
1936
1937     s->overflow_callback = cb;
1938     s->overflow_userdata = userdata;
1939 }
1940
1941 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1942     pa_assert(s);
1943     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1944
1945     if (pa_detect_fork())
1946         return;
1947
1948     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1949         return;
1950
1951     s->underflow_callback = cb;
1952     s->underflow_userdata = userdata;
1953 }
1954
1955 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1956     pa_assert(s);
1957     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1958
1959     if (pa_detect_fork())
1960         return;
1961
1962     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1963         return;
1964
1965     s->latency_update_callback = cb;
1966     s->latency_update_userdata = userdata;
1967 }
1968
1969 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1970     pa_assert(s);
1971     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1972
1973     if (pa_detect_fork())
1974         return;
1975
1976     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1977         return;
1978
1979     s->moved_callback = cb;
1980     s->moved_userdata = userdata;
1981 }
1982
1983 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1984     pa_assert(s);
1985     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1986
1987     if (pa_detect_fork())
1988         return;
1989
1990     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1991         return;
1992
1993     s->suspended_callback = cb;
1994     s->suspended_userdata = userdata;
1995 }
1996
1997 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1998     pa_assert(s);
1999     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2000
2001     if (pa_detect_fork())
2002         return;
2003
2004     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2005         return;
2006
2007     s->started_callback = cb;
2008     s->started_userdata = userdata;
2009 }
2010
2011 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2012     pa_assert(s);
2013     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2014
2015     if (pa_detect_fork())
2016         return;
2017
2018     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2019         return;
2020
2021     s->event_callback = cb;
2022     s->event_userdata = userdata;
2023 }
2024
2025 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2026     pa_assert(s);
2027     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2028
2029     if (pa_detect_fork())
2030         return;
2031
2032     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2033         return;
2034
2035     s->buffer_attr_callback = cb;
2036     s->buffer_attr_userdata = userdata;
2037 }
2038
2039 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2040     pa_operation *o = userdata;
2041     int success = 1;
2042
2043     pa_assert(pd);
2044     pa_assert(o);
2045     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2046
2047     if (!o->context)
2048         goto finish;
2049
2050     if (command != PA_COMMAND_REPLY) {
2051         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2052             goto finish;
2053
2054         success = 0;
2055     } else if (!pa_tagstruct_eof(t)) {
2056         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2057         goto finish;
2058     }
2059
2060     if (o->callback) {
2061         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2062         cb(o->stream, success, o->userdata);
2063     }
2064
2065 finish:
2066     pa_operation_done(o);
2067     pa_operation_unref(o);
2068 }
2069
2070 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2071     pa_operation *o;
2072     pa_tagstruct *t;
2073     uint32_t tag;
2074
2075     pa_assert(s);
2076     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2077
2078     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2079     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2080     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2081
2082     /* Ask for a timing update before we cork/uncork to get the best
2083      * accuracy for the transport latency suitable for the
2084      * check_smoother_status() call in the started callback */
2085     request_auto_timing_update(s, TRUE);
2086
2087     s->corked = b;
2088
2089     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2090
2091     t = pa_tagstruct_command(
2092             s->context,
2093             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2094             &tag);
2095     pa_tagstruct_putu32(t, s->channel);
2096     pa_tagstruct_put_boolean(t, !!b);
2097     pa_pstream_send_tagstruct(s->context->pstream, t);
2098     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2099
2100     check_smoother_status(s, FALSE, FALSE, FALSE);
2101
2102     /* This might cause the indexes to hang/start again, hence let's
2103      * request a timing update, after the cork/uncork, too */
2104     request_auto_timing_update(s, TRUE);
2105
2106     return o;
2107 }
2108
2109 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2110     pa_tagstruct *t;
2111     pa_operation *o;
2112     uint32_t tag;
2113
2114     pa_assert(s);
2115     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2116
2117     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2118     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2119
2120     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2121
2122     t = pa_tagstruct_command(s->context, command, &tag);
2123     pa_tagstruct_putu32(t, s->channel);
2124     pa_pstream_send_tagstruct(s->context->pstream, t);
2125     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2126
2127     return o;
2128 }
2129
2130 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2131     pa_operation *o;
2132
2133     pa_assert(s);
2134     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2135
2136     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2137     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2138     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2139
2140     /* Ask for a timing update *before* the flush, so that the
2141      * transport usec is as up to date as possible when we get the
2142      * underflow message and update the smoother status*/
2143     request_auto_timing_update(s, TRUE);
2144
2145     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2146         return NULL;
2147
2148     if (s->direction == PA_STREAM_PLAYBACK) {
2149
2150         if (s->write_index_corrections[s->current_write_index_correction].valid)
2151             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2152
2153         if (s->buffer_attr.prebuf > 0)
2154             check_smoother_status(s, FALSE, FALSE, TRUE);
2155
2156         /* This will change the write index, but leave the
2157          * read index untouched. */
2158         invalidate_indexes(s, FALSE, TRUE);
2159
2160     } else
2161         /* For record streams this has no influence on the write
2162          * index, but the read index might jump. */
2163         invalidate_indexes(s, TRUE, FALSE);
2164
2165     /* Note that we do not update requested_bytes here. This is
2166      * because we cannot really know how data actually was dropped
2167      * from the write index due to this. This 'error' will be applied
2168      * by both client and server and hence we should be fine. */
2169
2170     return o;
2171 }
2172
2173 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2174     pa_operation *o;
2175
2176     pa_assert(s);
2177     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2178
2179     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2180     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2181     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2182     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2183
2184     /* Ask for a timing update before we cork/uncork to get the best
2185      * accuracy for the transport latency suitable for the
2186      * check_smoother_status() call in the started callback */
2187     request_auto_timing_update(s, TRUE);
2188
2189     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2190         return NULL;
2191
2192     /* This might cause the read index to hang again, hence
2193      * let's request a timing update */
2194     request_auto_timing_update(s, TRUE);
2195
2196     return o;
2197 }
2198
2199 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2200     pa_operation *o;
2201
2202     pa_assert(s);
2203     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2204
2205     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2206     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2207     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2208     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2209
2210     /* Ask for a timing update before we cork/uncork to get the best
2211      * accuracy for the transport latency suitable for the
2212      * check_smoother_status() call in the started callback */
2213     request_auto_timing_update(s, TRUE);
2214
2215     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2216         return NULL;
2217
2218     /* This might cause the read index to start moving again, hence
2219      * let's request a timing update */
2220     request_auto_timing_update(s, TRUE);
2221
2222     return o;
2223 }
2224
2225 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2226     pa_operation *o;
2227
2228     pa_assert(s);
2229     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2230     pa_assert(name);
2231
2232     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2233     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2234     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2235
2236     if (s->context->version >= 13) {
2237         pa_proplist *p = pa_proplist_new();
2238
2239         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2240         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2241         pa_proplist_free(p);
2242     } else {
2243         pa_tagstruct *t;
2244         uint32_t tag;
2245
2246         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2247         t = pa_tagstruct_command(
2248                 s->context,
2249                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2250                 &tag);
2251         pa_tagstruct_putu32(t, s->channel);
2252         pa_tagstruct_puts(t, name);
2253         pa_pstream_send_tagstruct(s->context->pstream, t);
2254         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2255     }
2256
2257     return o;
2258 }
2259
2260 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2261     pa_usec_t usec;
2262
2263     pa_assert(s);
2264     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2265
2266     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2267     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2268     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2269     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2270     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2271     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2272
2273     if (s->smoother)
2274         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2275     else
2276         usec = calc_time(s, FALSE);
2277
2278     /* Make sure the time runs monotonically */
2279     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2280         if (usec < s->previous_time)
2281             usec = s->previous_time;
2282         else
2283             s->previous_time = usec;
2284     }
2285
2286     if (r_usec)
2287         *r_usec = usec;
2288
2289     return 0;
2290 }
2291
2292 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2293     pa_assert(s);
2294     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2295
2296     if (negative)
2297         *negative = 0;
2298
2299     if (a >= b)
2300         return a-b;
2301     else {
2302         if (negative && s->direction == PA_STREAM_RECORD) {
2303             *negative = 1;
2304             return b-a;
2305         } else
2306             return 0;
2307     }
2308 }
2309
2310 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2311     pa_usec_t t, c;
2312     int r;
2313     int64_t cindex;
2314
2315     pa_assert(s);
2316     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2317     pa_assert(r_usec);
2318
2319     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2320     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2321     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2322     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2323     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2324     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2325
2326     if ((r = pa_stream_get_time(s, &t)) < 0)
2327         return r;
2328
2329     if (s->direction == PA_STREAM_PLAYBACK)
2330         cindex = s->timing_info.write_index;
2331     else
2332         cindex = s->timing_info.read_index;
2333
2334     if (cindex < 0)
2335         cindex = 0;
2336
2337     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2338
2339     if (s->direction == PA_STREAM_PLAYBACK)
2340         *r_usec = time_counter_diff(s, c, t, negative);
2341     else
2342         *r_usec = time_counter_diff(s, t, c, negative);
2343
2344     return 0;
2345 }
2346
2347 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2348     pa_assert(s);
2349     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2350
2351     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2352     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2353     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2354     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2355
2356     return &s->timing_info;
2357 }
2358
2359 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2360     pa_assert(s);
2361     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2362
2363     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2364
2365     return &s->sample_spec;
2366 }
2367
2368 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2369     pa_assert(s);
2370     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2371
2372     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2373
2374     return &s->channel_map;
2375 }
2376
2377 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2378     pa_assert(s);
2379     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2380
2381     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2382     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2383     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2384
2385     return &s->buffer_attr;
2386 }
2387
2388 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2389     pa_operation *o = userdata;
2390     int success = 1;
2391
2392     pa_assert(pd);
2393     pa_assert(o);
2394     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2395
2396     if (!o->context)
2397         goto finish;
2398
2399     if (command != PA_COMMAND_REPLY) {
2400         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2401             goto finish;
2402
2403         success = 0;
2404     } else {
2405         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2406             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2407                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2408                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2409                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2410                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2411                 goto finish;
2412             }
2413         } else if (o->stream->direction == PA_STREAM_RECORD) {
2414             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2415                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2416                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2417                 goto finish;
2418             }
2419         }
2420
2421         if (o->stream->context->version >= 13) {
2422             pa_usec_t usec;
2423
2424             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2425                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2426                 goto finish;
2427             }
2428
2429             if (o->stream->direction == PA_STREAM_RECORD)
2430                 o->stream->timing_info.configured_source_usec = usec;
2431             else
2432                 o->stream->timing_info.configured_sink_usec = usec;
2433         }
2434
2435         if (!pa_tagstruct_eof(t)) {
2436             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2437             goto finish;
2438         }
2439     }
2440
2441     if (o->callback) {
2442         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2443         cb(o->stream, success, o->userdata);
2444     }
2445
2446 finish:
2447     pa_operation_done(o);
2448     pa_operation_unref(o);
2449 }
2450
2451
2452 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2453     pa_operation *o;
2454     pa_tagstruct *t;
2455     uint32_t tag;
2456     pa_buffer_attr copy;
2457
2458     pa_assert(s);
2459     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2460     pa_assert(attr);
2461
2462     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2463     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2464     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2465     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2466
2467     /* Ask for a timing update before we cork/uncork to get the best
2468      * accuracy for the transport latency suitable for the
2469      * check_smoother_status() call in the started callback */
2470     request_auto_timing_update(s, TRUE);
2471
2472     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2473
2474     t = pa_tagstruct_command(
2475             s->context,
2476             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2477             &tag);
2478     pa_tagstruct_putu32(t, s->channel);
2479
2480     copy = *attr;
2481     patch_buffer_attr(s, &copy, NULL);
2482     attr = &copy;
2483
2484     pa_tagstruct_putu32(t, attr->maxlength);
2485
2486     if (s->direction == PA_STREAM_PLAYBACK)
2487         pa_tagstruct_put(
2488                 t,
2489                 PA_TAG_U32, attr->tlength,
2490                 PA_TAG_U32, attr->prebuf,
2491                 PA_TAG_U32, attr->minreq,
2492                 PA_TAG_INVALID);
2493     else
2494         pa_tagstruct_putu32(t, attr->fragsize);
2495
2496     if (s->context->version >= 13)
2497         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2498
2499     if (s->context->version >= 14)
2500         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2501
2502     pa_pstream_send_tagstruct(s->context->pstream, t);
2503     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2504
2505     /* This might cause changes in the read/write indexex, hence let's
2506      * request a timing update */
2507     request_auto_timing_update(s, TRUE);
2508
2509     return o;
2510 }
2511
2512 uint32_t pa_stream_get_device_index(pa_stream *s) {
2513     pa_assert(s);
2514     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2515
2516     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2517     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2518     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2519     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2520     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2521
2522     return s->device_index;
2523 }
2524
2525 const char *pa_stream_get_device_name(pa_stream *s) {
2526     pa_assert(s);
2527     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2528
2529     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2530     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2531     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2532     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2533     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2534
2535     return s->device_name;
2536 }
2537
2538 int pa_stream_is_suspended(pa_stream *s) {
2539     pa_assert(s);
2540     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2541
2542     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2543     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2544     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2545     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2546
2547     return s->suspended;
2548 }
2549
2550 int pa_stream_is_corked(pa_stream *s) {
2551     pa_assert(s);
2552     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2553
2554     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2555     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2556     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2557
2558     return s->corked;
2559 }
2560
2561 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2562     pa_operation *o = userdata;
2563     int success = 1;
2564
2565     pa_assert(pd);
2566     pa_assert(o);
2567     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2568
2569     if (!o->context)
2570         goto finish;
2571
2572     if (command != PA_COMMAND_REPLY) {
2573         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2574             goto finish;
2575
2576         success = 0;
2577     } else {
2578
2579         if (!pa_tagstruct_eof(t)) {
2580             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2581             goto finish;
2582         }
2583     }
2584
2585     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2586     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2587
2588     if (o->callback) {
2589         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2590         cb(o->stream, success, o->userdata);
2591     }
2592
2593 finish:
2594     pa_operation_done(o);
2595     pa_operation_unref(o);
2596 }
2597
2598
2599 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2600     pa_operation *o;
2601     pa_tagstruct *t;
2602     uint32_t tag;
2603
2604     pa_assert(s);
2605     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2606
2607     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2608     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2609     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2610     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2611     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2612     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2613
2614     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2615     o->private = PA_UINT_TO_PTR(rate);
2616
2617     t = pa_tagstruct_command(
2618             s->context,
2619             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2620             &tag);
2621     pa_tagstruct_putu32(t, s->channel);
2622     pa_tagstruct_putu32(t, rate);
2623
2624     pa_pstream_send_tagstruct(s->context->pstream, t);
2625     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2626
2627     return o;
2628 }
2629
2630 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2631     pa_operation *o;
2632     pa_tagstruct *t;
2633     uint32_t tag;
2634
2635     pa_assert(s);
2636     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2637
2638     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2639     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2640     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2641     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2642     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2643
2644     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2645
2646     t = pa_tagstruct_command(
2647             s->context,
2648             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2649             &tag);
2650     pa_tagstruct_putu32(t, s->channel);
2651     pa_tagstruct_putu32(t, (uint32_t) mode);
2652     pa_tagstruct_put_proplist(t, p);
2653
2654     pa_pstream_send_tagstruct(s->context->pstream, t);
2655     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2656
2657     /* Please note that we don't update s->proplist here, because we
2658      * don't export that field */
2659
2660     return o;
2661 }
2662
2663 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2664     pa_operation *o;
2665     pa_tagstruct *t;
2666     uint32_t tag;
2667     const char * const*k;
2668
2669     pa_assert(s);
2670     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2671
2672     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2673     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2674     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2675     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2676     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2677
2678     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2679
2680     t = pa_tagstruct_command(
2681             s->context,
2682             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2683             &tag);
2684     pa_tagstruct_putu32(t, s->channel);
2685
2686     for (k = keys; *k; k++)
2687         pa_tagstruct_puts(t, *k);
2688
2689     pa_tagstruct_puts(t, NULL);
2690
2691     pa_pstream_send_tagstruct(s->context->pstream, t);
2692     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2693
2694     /* Please note that we don't update s->proplist here, because we
2695      * don't export that field */
2696
2697     return o;
2698 }
2699
2700 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2701     pa_assert(s);
2702     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2703
2704     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2705     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2706     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2707     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2708
2709     s->direct_on_input = sink_input_idx;
2710
2711     return 0;
2712 }
2713
2714 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2715     pa_assert(s);
2716     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2717
2718     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2719     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2720     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2721
2722     return s->direct_on_input;
2723 }