native: fix request counter miscalculations
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57     s->read_callback = NULL;
58     s->read_userdata = NULL;
59     s->write_callback = NULL;
60     s->write_userdata = NULL;
61     s->state_callback = NULL;
62     s->state_userdata = NULL;
63     s->overflow_callback = NULL;
64     s->overflow_userdata = NULL;
65     s->underflow_callback = NULL;
66     s->underflow_userdata = NULL;
67     s->latency_update_callback = NULL;
68     s->latency_update_userdata = NULL;
69     s->moved_callback = NULL;
70     s->moved_userdata = NULL;
71     s->suspended_callback = NULL;
72     s->suspended_userdata = NULL;
73     s->started_callback = NULL;
74     s->started_userdata = NULL;
75     s->event_callback = NULL;
76     s->event_userdata = NULL;
77     s->buffer_attr_callback = NULL;
78     s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82         pa_context *c,
83         const char *name,
84         const pa_sample_spec *ss,
85         const pa_channel_map *map,
86         pa_proplist *p) {
87
88     pa_stream *s;
89     int i;
90     pa_channel_map tmap;
91
92     pa_assert(c);
93     pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     if (!map)
104         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114
115     s->sample_spec = *ss;
116     s->channel_map = *map;
117
118     s->direct_on_input = PA_INVALID_INDEX;
119
120     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121     if (name)
122         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124     s->channel = 0;
125     s->channel_valid = FALSE;
126     s->syncid = c->csyncid++;
127     s->stream_index = PA_INVALID_INDEX;
128
129     s->requested_bytes = 0;
130     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132     /* We initialize der target length here, so that if the user
133      * passes no explicit buffering metrics the default is similar to
134      * what older PA versions provided. */
135
136     s->buffer_attr.maxlength = (uint32_t) -1;
137     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138     s->buffer_attr.minreq = (uint32_t) -1;
139     s->buffer_attr.prebuf = (uint32_t) -1;
140     s->buffer_attr.fragsize = (uint32_t) -1;
141
142     s->device_index = PA_INVALID_INDEX;
143     s->device_name = NULL;
144     s->suspended = FALSE;
145     s->corked = FALSE;
146
147     s->write_memblock = NULL;
148     s->write_data = NULL;
149
150     pa_memchunk_reset(&s->peek_memchunk);
151     s->peek_data = NULL;
152     s->record_memblockq = NULL;
153
154     memset(&s->timing_info, 0, sizeof(s->timing_info));
155     s->timing_info_valid = FALSE;
156
157     s->previous_time = 0;
158
159     s->read_index_not_before = 0;
160     s->write_index_not_before = 0;
161     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162         s->write_index_corrections[i].valid = 0;
163     s->current_write_index_correction = 0;
164
165     s->auto_timing_update_event = NULL;
166     s->auto_timing_update_requested = FALSE;
167     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
168
169     reset_callbacks(s);
170
171     s->smoother = NULL;
172
173     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174     PA_LLIST_PREPEND(pa_stream, c->streams, s);
175     pa_stream_ref(s);
176
177     return s;
178 }
179
180 static void stream_unlink(pa_stream *s) {
181     pa_operation *o, *n;
182     pa_assert(s);
183
184     if (!s->context)
185         return;
186
187     /* Detach from context */
188
189     /* Unref all operatio object that point to us */
190     for (o = s->context->operations; o; o = n) {
191         n = o->next;
192
193         if (o->stream == s)
194             pa_operation_cancel(o);
195     }
196
197     /* Drop all outstanding replies for this stream */
198     if (s->context->pdispatch)
199         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200
201     if (s->channel_valid) {
202         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
203         s->channel = 0;
204         s->channel_valid = FALSE;
205     }
206
207     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208     pa_stream_unref(s);
209
210     s->context = NULL;
211
212     if (s->auto_timing_update_event) {
213         pa_assert(s->mainloop);
214         s->mainloop->time_free(s->auto_timing_update_event);
215     }
216
217     reset_callbacks(s);
218 }
219
220 static void stream_free(pa_stream *s) {
221     pa_assert(s);
222
223     stream_unlink(s);
224
225     if (s->write_memblock) {
226         pa_memblock_release(s->write_memblock);
227         pa_memblock_unref(s->write_data);
228     }
229
230     if (s->peek_memchunk.memblock) {
231         if (s->peek_data)
232             pa_memblock_release(s->peek_memchunk.memblock);
233         pa_memblock_unref(s->peek_memchunk.memblock);
234     }
235
236     if (s->record_memblockq)
237         pa_memblockq_free(s->record_memblockq);
238
239     if (s->proplist)
240         pa_proplist_free(s->proplist);
241
242     if (s->smoother)
243         pa_smoother_free(s->smoother);
244
245     pa_xfree(s->device_name);
246     pa_xfree(s);
247 }
248
249 void pa_stream_unref(pa_stream *s) {
250     pa_assert(s);
251     pa_assert(PA_REFCNT_VALUE(s) >= 1);
252
253     if (PA_REFCNT_DEC(s) <= 0)
254         stream_free(s);
255 }
256
257 pa_stream* pa_stream_ref(pa_stream *s) {
258     pa_assert(s);
259     pa_assert(PA_REFCNT_VALUE(s) >= 1);
260
261     PA_REFCNT_INC(s);
262     return s;
263 }
264
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->state;
270 }
271
272 pa_context* pa_stream_get_context(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     return s->context;
277 }
278
279 uint32_t pa_stream_get_index(pa_stream *s) {
280     pa_assert(s);
281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
282
283     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
285
286     return s->stream_index;
287 }
288
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290     pa_assert(s);
291     pa_assert(PA_REFCNT_VALUE(s) >= 1);
292
293     if (s->state == st)
294         return;
295
296     pa_stream_ref(s);
297
298     s->state = st;
299
300     if (s->state_callback)
301         s->state_callback(s, s->state_userdata);
302
303     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304         stream_unlink(s);
305
306     pa_stream_unref(s);
307 }
308
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310     pa_assert(s);
311     pa_assert(PA_REFCNT_VALUE(s) >= 1);
312
313     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314         return;
315
316     if (s->state == PA_STREAM_READY &&
317         (force || !s->auto_timing_update_requested)) {
318         pa_operation *o;
319
320 /*         pa_log("Automatically requesting new timing data"); */
321
322         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323             pa_operation_unref(o);
324             s->auto_timing_update_requested = TRUE;
325         }
326     }
327
328     if (s->auto_timing_update_event) {
329         if (force)
330             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
331
332         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
333
334         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335     }
336 }
337
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339     pa_context *c = userdata;
340     pa_stream *s;
341     uint32_t channel;
342
343     pa_assert(pd);
344     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345     pa_assert(t);
346     pa_assert(c);
347     pa_assert(PA_REFCNT_VALUE(c) >= 1);
348
349     pa_context_ref(c);
350
351     if (pa_tagstruct_getu32(t, &channel) < 0 ||
352         !pa_tagstruct_eof(t)) {
353         pa_context_fail(c, PA_ERR_PROTOCOL);
354         goto finish;
355     }
356
357     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
358         goto finish;
359
360     if (s->state != PA_STREAM_READY)
361         goto finish;
362
363     pa_context_set_error(c, PA_ERR_KILLED);
364     pa_stream_set_state(s, PA_STREAM_FAILED);
365
366 finish:
367     pa_context_unref(c);
368 }
369
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371     pa_usec_t x;
372
373     pa_assert(s);
374     pa_assert(!force_start || !force_stop);
375
376     if (!s->smoother)
377         return;
378
379     x = pa_rtclock_now();
380
381     if (s->timing_info_valid) {
382         if (aposteriori)
383             x -= s->timing_info.transport_usec;
384         else
385             x += s->timing_info.transport_usec;
386     }
387
388     if (s->suspended || s->corked || force_stop)
389         pa_smoother_pause(s->smoother, x);
390     else if (force_start || s->buffer_attr.prebuf == 0) {
391
392         if (!s->timing_info_valid &&
393             !aposteriori &&
394             !force_start &&
395             !force_stop &&
396             s->context->version >= 13) {
397
398             /* If the server supports STARTED events we take them as
399              * indications when audio really starts/stops playing, if
400              * we don't have any timing info yet -- instead of trying
401              * to be smart and guessing the server time. Otherwise the
402              * unknown transport delay add too much noise to our time
403              * calculations. */
404
405             return;
406         }
407
408         pa_smoother_resume(s->smoother, x, TRUE);
409     }
410
411     /* Please note that we have no idea if playback actually started
412      * if prebuf is non-zero! */
413 }
414
415 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
416     pa_context *c = userdata;
417     pa_stream *s;
418     uint32_t channel;
419     const char *dn;
420     pa_bool_t suspended;
421     uint32_t di;
422     pa_usec_t usec = 0;
423     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
424
425     pa_assert(pd);
426     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
427     pa_assert(t);
428     pa_assert(c);
429     pa_assert(PA_REFCNT_VALUE(c) >= 1);
430
431     pa_context_ref(c);
432
433     if (c->version < 12) {
434         pa_context_fail(c, PA_ERR_PROTOCOL);
435         goto finish;
436     }
437
438     if (pa_tagstruct_getu32(t, &channel) < 0 ||
439         pa_tagstruct_getu32(t, &di) < 0 ||
440         pa_tagstruct_gets(t, &dn) < 0 ||
441         pa_tagstruct_get_boolean(t, &suspended) < 0) {
442         pa_context_fail(c, PA_ERR_PROTOCOL);
443         goto finish;
444     }
445
446     if (c->version >= 13) {
447
448         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
449             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
450                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
451                 pa_tagstruct_get_usec(t, &usec) < 0) {
452                 pa_context_fail(c, PA_ERR_PROTOCOL);
453                 goto finish;
454             }
455         } else {
456             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
457                 pa_tagstruct_getu32(t, &tlength) < 0 ||
458                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
459                 pa_tagstruct_getu32(t, &minreq) < 0 ||
460                 pa_tagstruct_get_usec(t, &usec) < 0) {
461                 pa_context_fail(c, PA_ERR_PROTOCOL);
462                 goto finish;
463             }
464         }
465     }
466
467     if (!pa_tagstruct_eof(t)) {
468         pa_context_fail(c, PA_ERR_PROTOCOL);
469         goto finish;
470     }
471
472     if (!dn || di == PA_INVALID_INDEX) {
473         pa_context_fail(c, PA_ERR_PROTOCOL);
474         goto finish;
475     }
476
477     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
478         goto finish;
479
480     if (s->state != PA_STREAM_READY)
481         goto finish;
482
483     if (c->version >= 13) {
484         if (s->direction == PA_STREAM_RECORD)
485             s->timing_info.configured_source_usec = usec;
486         else
487             s->timing_info.configured_sink_usec = usec;
488
489         s->buffer_attr.maxlength = maxlength;
490         s->buffer_attr.fragsize = fragsize;
491         s->buffer_attr.tlength = tlength;
492         s->buffer_attr.prebuf = prebuf;
493         s->buffer_attr.minreq = minreq;
494     }
495
496     pa_xfree(s->device_name);
497     s->device_name = pa_xstrdup(dn);
498     s->device_index = di;
499
500     s->suspended = suspended;
501
502     check_smoother_status(s, TRUE, FALSE, FALSE);
503     request_auto_timing_update(s, TRUE);
504
505     if (s->moved_callback)
506         s->moved_callback(s, s->moved_userdata);
507
508 finish:
509     pa_context_unref(c);
510 }
511
512 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
513     pa_context *c = userdata;
514     pa_stream *s;
515     uint32_t channel;
516     pa_usec_t usec = 0;
517     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
518
519     pa_assert(pd);
520     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
521     pa_assert(t);
522     pa_assert(c);
523     pa_assert(PA_REFCNT_VALUE(c) >= 1);
524
525     pa_context_ref(c);
526
527     if (c->version < 15) {
528         pa_context_fail(c, PA_ERR_PROTOCOL);
529         goto finish;
530     }
531
532     if (pa_tagstruct_getu32(t, &channel) < 0) {
533         pa_context_fail(c, PA_ERR_PROTOCOL);
534         goto finish;
535     }
536
537     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
538         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
539             pa_tagstruct_getu32(t, &fragsize) < 0 ||
540             pa_tagstruct_get_usec(t, &usec) < 0) {
541             pa_context_fail(c, PA_ERR_PROTOCOL);
542             goto finish;
543         }
544     } else {
545         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
546             pa_tagstruct_getu32(t, &tlength) < 0 ||
547             pa_tagstruct_getu32(t, &prebuf) < 0 ||
548             pa_tagstruct_getu32(t, &minreq) < 0 ||
549             pa_tagstruct_get_usec(t, &usec) < 0) {
550             pa_context_fail(c, PA_ERR_PROTOCOL);
551             goto finish;
552         }
553     }
554
555     if (!pa_tagstruct_eof(t)) {
556         pa_context_fail(c, PA_ERR_PROTOCOL);
557         goto finish;
558     }
559
560     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
561         goto finish;
562
563     if (s->state != PA_STREAM_READY)
564         goto finish;
565
566     if (s->direction == PA_STREAM_RECORD)
567         s->timing_info.configured_source_usec = usec;
568     else
569         s->timing_info.configured_sink_usec = usec;
570
571     s->buffer_attr.maxlength = maxlength;
572     s->buffer_attr.fragsize = fragsize;
573     s->buffer_attr.tlength = tlength;
574     s->buffer_attr.prebuf = prebuf;
575     s->buffer_attr.minreq = minreq;
576
577     request_auto_timing_update(s, TRUE);
578
579     if (s->buffer_attr_callback)
580         s->buffer_attr_callback(s, s->buffer_attr_userdata);
581
582 finish:
583     pa_context_unref(c);
584 }
585
586 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
587     pa_context *c = userdata;
588     pa_stream *s;
589     uint32_t channel;
590     pa_bool_t suspended;
591
592     pa_assert(pd);
593     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
594     pa_assert(t);
595     pa_assert(c);
596     pa_assert(PA_REFCNT_VALUE(c) >= 1);
597
598     pa_context_ref(c);
599
600     if (c->version < 12) {
601         pa_context_fail(c, PA_ERR_PROTOCOL);
602         goto finish;
603     }
604
605     if (pa_tagstruct_getu32(t, &channel) < 0 ||
606         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
607         !pa_tagstruct_eof(t)) {
608         pa_context_fail(c, PA_ERR_PROTOCOL);
609         goto finish;
610     }
611
612     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
613         goto finish;
614
615     if (s->state != PA_STREAM_READY)
616         goto finish;
617
618     s->suspended = suspended;
619
620     check_smoother_status(s, TRUE, FALSE, FALSE);
621     request_auto_timing_update(s, TRUE);
622
623     if (s->suspended_callback)
624         s->suspended_callback(s, s->suspended_userdata);
625
626 finish:
627     pa_context_unref(c);
628 }
629
630 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
631     pa_context *c = userdata;
632     pa_stream *s;
633     uint32_t channel;
634
635     pa_assert(pd);
636     pa_assert(command == PA_COMMAND_STARTED);
637     pa_assert(t);
638     pa_assert(c);
639     pa_assert(PA_REFCNT_VALUE(c) >= 1);
640
641     pa_context_ref(c);
642
643     if (c->version < 13) {
644         pa_context_fail(c, PA_ERR_PROTOCOL);
645         goto finish;
646     }
647
648     if (pa_tagstruct_getu32(t, &channel) < 0 ||
649         !pa_tagstruct_eof(t)) {
650         pa_context_fail(c, PA_ERR_PROTOCOL);
651         goto finish;
652     }
653
654     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
655         goto finish;
656
657     if (s->state != PA_STREAM_READY)
658         goto finish;
659
660     check_smoother_status(s, TRUE, TRUE, FALSE);
661     request_auto_timing_update(s, TRUE);
662
663     if (s->started_callback)
664         s->started_callback(s, s->started_userdata);
665
666 finish:
667     pa_context_unref(c);
668 }
669
670 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
671     pa_context *c = userdata;
672     pa_stream *s;
673     uint32_t channel;
674     pa_proplist *pl = NULL;
675     const char *event;
676
677     pa_assert(pd);
678     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
679     pa_assert(t);
680     pa_assert(c);
681     pa_assert(PA_REFCNT_VALUE(c) >= 1);
682
683     pa_context_ref(c);
684
685     if (c->version < 15) {
686         pa_context_fail(c, PA_ERR_PROTOCOL);
687         goto finish;
688     }
689
690     pl = pa_proplist_new();
691
692     if (pa_tagstruct_getu32(t, &channel) < 0 ||
693         pa_tagstruct_gets(t, &event) < 0 ||
694         pa_tagstruct_get_proplist(t, pl) < 0 ||
695         !pa_tagstruct_eof(t) || !event) {
696         pa_context_fail(c, PA_ERR_PROTOCOL);
697         goto finish;
698     }
699
700     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
701         goto finish;
702
703     if (s->state != PA_STREAM_READY)
704         goto finish;
705
706     if (s->event_callback)
707         s->event_callback(s, event, pl, s->event_userdata);
708
709 finish:
710     pa_context_unref(c);
711
712     if (pl)
713         pa_proplist_free(pl);
714 }
715
716 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717     pa_stream *s;
718     pa_context *c = userdata;
719     uint32_t bytes, channel;
720
721     pa_assert(pd);
722     pa_assert(command == PA_COMMAND_REQUEST);
723     pa_assert(t);
724     pa_assert(c);
725     pa_assert(PA_REFCNT_VALUE(c) >= 1);
726
727     pa_context_ref(c);
728
729     if (pa_tagstruct_getu32(t, &channel) < 0 ||
730         pa_tagstruct_getu32(t, &bytes) < 0 ||
731         !pa_tagstruct_eof(t)) {
732         pa_context_fail(c, PA_ERR_PROTOCOL);
733         goto finish;
734     }
735
736     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
737         goto finish;
738
739     if (s->state != PA_STREAM_READY)
740         goto finish;
741
742     s->requested_bytes += bytes;
743
744     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
745
746     if (s->requested_bytes > 0 && s->write_callback)
747         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
748
749 finish:
750     pa_context_unref(c);
751 }
752
753 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
754     pa_stream *s;
755     pa_context *c = userdata;
756     uint32_t channel;
757
758     pa_assert(pd);
759     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
760     pa_assert(t);
761     pa_assert(c);
762     pa_assert(PA_REFCNT_VALUE(c) >= 1);
763
764     pa_context_ref(c);
765
766     if (pa_tagstruct_getu32(t, &channel) < 0 ||
767         !pa_tagstruct_eof(t)) {
768         pa_context_fail(c, PA_ERR_PROTOCOL);
769         goto finish;
770     }
771
772     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
773         goto finish;
774
775     if (s->state != PA_STREAM_READY)
776         goto finish;
777
778     if (s->buffer_attr.prebuf > 0)
779         check_smoother_status(s, TRUE, FALSE, TRUE);
780
781     request_auto_timing_update(s, TRUE);
782
783     if (command == PA_COMMAND_OVERFLOW) {
784         if (s->overflow_callback)
785             s->overflow_callback(s, s->overflow_userdata);
786     } else if (command == PA_COMMAND_UNDERFLOW) {
787         if (s->underflow_callback)
788             s->underflow_callback(s, s->underflow_userdata);
789     }
790
791  finish:
792     pa_context_unref(c);
793 }
794
795 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
796     pa_assert(s);
797     pa_assert(PA_REFCNT_VALUE(s) >= 1);
798
799 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
800
801     if (s->state != PA_STREAM_READY)
802         return;
803
804     if (w) {
805         s->write_index_not_before = s->context->ctag;
806
807         if (s->timing_info_valid)
808             s->timing_info.write_index_corrupt = TRUE;
809
810 /*         pa_log("write_index invalidated"); */
811     }
812
813     if (r) {
814         s->read_index_not_before = s->context->ctag;
815
816         if (s->timing_info_valid)
817             s->timing_info.read_index_corrupt = TRUE;
818
819 /*         pa_log("read_index invalidated"); */
820     }
821
822     request_auto_timing_update(s, TRUE);
823 }
824
825 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
826     pa_stream *s = userdata;
827
828     pa_assert(s);
829     pa_assert(PA_REFCNT_VALUE(s) >= 1);
830
831     pa_stream_ref(s);
832     request_auto_timing_update(s, FALSE);
833     pa_stream_unref(s);
834 }
835
836 static void create_stream_complete(pa_stream *s) {
837     pa_assert(s);
838     pa_assert(PA_REFCNT_VALUE(s) >= 1);
839     pa_assert(s->state == PA_STREAM_CREATING);
840
841     pa_stream_set_state(s, PA_STREAM_READY);
842
843     if (s->requested_bytes > 0 && s->write_callback)
844         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
845
846     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
847         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
848         pa_assert(!s->auto_timing_update_event);
849         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
850
851         request_auto_timing_update(s, TRUE);
852     }
853
854     check_smoother_status(s, TRUE, FALSE, FALSE);
855 }
856
857 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
858     pa_assert(s);
859     pa_assert(attr);
860     pa_assert(ss);
861
862     if (s->context->version >= 13)
863         return;
864
865     /* Version older than 0.9.10 didn't do server side buffer_attr
866      * selection, hence we have to fake it on the client side. */
867
868     /* We choose fairly conservative values here, to not confuse
869      * old clients with extremely large playback buffers */
870
871     if (attr->maxlength == (uint32_t) -1)
872         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
873
874     if (attr->tlength == (uint32_t) -1)
875         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
876
877     if (attr->minreq == (uint32_t) -1)
878         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
879
880     if (attr->prebuf == (uint32_t) -1)
881         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
882
883     if (attr->fragsize == (uint32_t) -1)
884         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
885 }
886
887 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
888     pa_stream *s = userdata;
889     uint32_t requested_bytes = 0;
890
891     pa_assert(pd);
892     pa_assert(s);
893     pa_assert(PA_REFCNT_VALUE(s) >= 1);
894     pa_assert(s->state == PA_STREAM_CREATING);
895
896     pa_stream_ref(s);
897
898     if (command != PA_COMMAND_REPLY) {
899         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
900             goto finish;
901
902         pa_stream_set_state(s, PA_STREAM_FAILED);
903         goto finish;
904     }
905
906     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
907         s->channel == PA_INVALID_INDEX ||
908         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
909         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
910         pa_context_fail(s->context, PA_ERR_PROTOCOL);
911         goto finish;
912     }
913
914     s->requested_bytes = (int64_t) requested_bytes;
915
916     if (s->context->version >= 9) {
917         if (s->direction == PA_STREAM_PLAYBACK) {
918             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
919                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
920                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
921                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
922                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
923                 goto finish;
924             }
925         } else if (s->direction == PA_STREAM_RECORD) {
926             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
927                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
928                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
929                 goto finish;
930             }
931         }
932     }
933
934     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
935         pa_sample_spec ss;
936         pa_channel_map cm;
937         const char *dn = NULL;
938         pa_bool_t suspended;
939
940         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
941             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
942             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
943             pa_tagstruct_gets(t, &dn) < 0 ||
944             pa_tagstruct_get_boolean(t, &suspended) < 0) {
945             pa_context_fail(s->context, PA_ERR_PROTOCOL);
946             goto finish;
947         }
948
949         if (!dn || s->device_index == PA_INVALID_INDEX ||
950             ss.channels != cm.channels ||
951             !pa_channel_map_valid(&cm) ||
952             !pa_sample_spec_valid(&ss) ||
953             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
954             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
955             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
956             pa_context_fail(s->context, PA_ERR_PROTOCOL);
957             goto finish;
958         }
959
960         pa_xfree(s->device_name);
961         s->device_name = pa_xstrdup(dn);
962         s->suspended = suspended;
963
964         s->channel_map = cm;
965         s->sample_spec = ss;
966     }
967
968     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
969         pa_usec_t usec;
970
971         if (pa_tagstruct_get_usec(t, &usec) < 0) {
972             pa_context_fail(s->context, PA_ERR_PROTOCOL);
973             goto finish;
974         }
975
976         if (s->direction == PA_STREAM_RECORD)
977             s->timing_info.configured_source_usec = usec;
978         else
979             s->timing_info.configured_sink_usec = usec;
980     }
981
982     if (!pa_tagstruct_eof(t)) {
983         pa_context_fail(s->context, PA_ERR_PROTOCOL);
984         goto finish;
985     }
986
987     if (s->direction == PA_STREAM_RECORD) {
988         pa_assert(!s->record_memblockq);
989
990         s->record_memblockq = pa_memblockq_new(
991                 0,
992                 s->buffer_attr.maxlength,
993                 0,
994                 pa_frame_size(&s->sample_spec),
995                 1,
996                 0,
997                 0,
998                 NULL);
999     }
1000
1001     s->channel_valid = TRUE;
1002     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1003
1004     create_stream_complete(s);
1005
1006 finish:
1007     pa_stream_unref(s);
1008 }
1009
1010 static int create_stream(
1011         pa_stream_direction_t direction,
1012         pa_stream *s,
1013         const char *dev,
1014         const pa_buffer_attr *attr,
1015         pa_stream_flags_t flags,
1016         const pa_cvolume *volume,
1017         pa_stream *sync_stream) {
1018
1019     pa_tagstruct *t;
1020     uint32_t tag;
1021     pa_bool_t volume_set = FALSE;
1022
1023     pa_assert(s);
1024     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1025     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1026
1027     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1028     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1029     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1030     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1031                                               PA_STREAM_INTERPOLATE_TIMING|
1032                                               PA_STREAM_NOT_MONOTONIC|
1033                                               PA_STREAM_AUTO_TIMING_UPDATE|
1034                                               PA_STREAM_NO_REMAP_CHANNELS|
1035                                               PA_STREAM_NO_REMIX_CHANNELS|
1036                                               PA_STREAM_FIX_FORMAT|
1037                                               PA_STREAM_FIX_RATE|
1038                                               PA_STREAM_FIX_CHANNELS|
1039                                               PA_STREAM_DONT_MOVE|
1040                                               PA_STREAM_VARIABLE_RATE|
1041                                               PA_STREAM_PEAK_DETECT|
1042                                               PA_STREAM_START_MUTED|
1043                                               PA_STREAM_ADJUST_LATENCY|
1044                                               PA_STREAM_EARLY_REQUESTS|
1045                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1046                                               PA_STREAM_START_UNMUTED|
1047                                               PA_STREAM_FAIL_ON_SUSPEND|
1048                                               PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1049
1050     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1051     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1052     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1053     /* Althought some of the other flags are not supported on older
1054      * version, we don't check for them here, because it doesn't hurt
1055      * when they are passed but actually not supported. This makes
1056      * client development easier */
1057
1058     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1059     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1060     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1061     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1062     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1063
1064     pa_stream_ref(s);
1065
1066     s->direction = direction;
1067     s->flags = flags;
1068     s->corked = !!(flags & PA_STREAM_START_CORKED);
1069
1070     if (sync_stream)
1071         s->syncid = sync_stream->syncid;
1072
1073     if (attr)
1074         s->buffer_attr = *attr;
1075     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1076
1077     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1078         pa_usec_t x;
1079
1080         x = pa_rtclock_now();
1081
1082         pa_assert(!s->smoother);
1083         s->smoother = pa_smoother_new(
1084                 SMOOTHER_ADJUST_TIME,
1085                 SMOOTHER_HISTORY_TIME,
1086                 !(flags & PA_STREAM_NOT_MONOTONIC),
1087                 TRUE,
1088                 SMOOTHER_MIN_HISTORY,
1089                 x,
1090                 TRUE);
1091     }
1092
1093     if (!dev)
1094         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1095
1096     t = pa_tagstruct_command(
1097             s->context,
1098             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1099             &tag);
1100
1101     if (s->context->version < 13)
1102         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1103
1104     pa_tagstruct_put(
1105             t,
1106             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1107             PA_TAG_CHANNEL_MAP, &s->channel_map,
1108             PA_TAG_U32, PA_INVALID_INDEX,
1109             PA_TAG_STRING, dev,
1110             PA_TAG_U32, s->buffer_attr.maxlength,
1111             PA_TAG_BOOLEAN, s->corked,
1112             PA_TAG_INVALID);
1113
1114     if (s->direction == PA_STREAM_PLAYBACK) {
1115         pa_cvolume cv;
1116
1117         pa_tagstruct_put(
1118                 t,
1119                 PA_TAG_U32, s->buffer_attr.tlength,
1120                 PA_TAG_U32, s->buffer_attr.prebuf,
1121                 PA_TAG_U32, s->buffer_attr.minreq,
1122                 PA_TAG_U32, s->syncid,
1123                 PA_TAG_INVALID);
1124
1125         volume_set = !!volume;
1126
1127         if (!volume)
1128             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1129
1130         pa_tagstruct_put_cvolume(t, volume);
1131     } else
1132         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1133
1134     if (s->context->version >= 12) {
1135         pa_tagstruct_put(
1136                 t,
1137                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1138                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1139                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1140                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1141                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1142                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1143                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1144                 PA_TAG_INVALID);
1145     }
1146
1147     if (s->context->version >= 13) {
1148
1149         if (s->direction == PA_STREAM_PLAYBACK)
1150             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1151         else
1152             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1153
1154         pa_tagstruct_put(
1155                 t,
1156                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1157                 PA_TAG_PROPLIST, s->proplist,
1158                 PA_TAG_INVALID);
1159
1160         if (s->direction == PA_STREAM_RECORD)
1161             pa_tagstruct_putu32(t, s->direct_on_input);
1162     }
1163
1164     if (s->context->version >= 14) {
1165
1166         if (s->direction == PA_STREAM_PLAYBACK)
1167             pa_tagstruct_put_boolean(t, volume_set);
1168
1169         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1170     }
1171
1172     if (s->context->version >= 15) {
1173
1174         if (s->direction == PA_STREAM_PLAYBACK)
1175             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1176
1177         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1178         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1179     }
1180
1181     if (s->context->version >= 17) {
1182
1183         if (s->direction == PA_STREAM_PLAYBACK)
1184             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1185
1186     }
1187
1188     pa_pstream_send_tagstruct(s->context->pstream, t);
1189     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1190
1191     pa_stream_set_state(s, PA_STREAM_CREATING);
1192
1193     pa_stream_unref(s);
1194     return 0;
1195 }
1196
1197 int pa_stream_connect_playback(
1198         pa_stream *s,
1199         const char *dev,
1200         const pa_buffer_attr *attr,
1201         pa_stream_flags_t flags,
1202         const pa_cvolume *volume,
1203         pa_stream *sync_stream) {
1204
1205     pa_assert(s);
1206     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1207
1208     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1209 }
1210
1211 int pa_stream_connect_record(
1212         pa_stream *s,
1213         const char *dev,
1214         const pa_buffer_attr *attr,
1215         pa_stream_flags_t flags) {
1216
1217     pa_assert(s);
1218     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1219
1220     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1221 }
1222
1223 int pa_stream_begin_write(
1224         pa_stream *s,
1225         void **data,
1226         size_t *nbytes) {
1227
1228     pa_assert(s);
1229     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1230
1231     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1232     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1233     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1234     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1235     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1236
1237     if (*nbytes != (size_t) -1) {
1238         size_t m, fs;
1239
1240         m = pa_mempool_block_size_max(s->context->mempool);
1241         fs = pa_frame_size(&s->sample_spec);
1242
1243         m = (m / fs) * fs;
1244         if (*nbytes > m)
1245             *nbytes = m;
1246     }
1247
1248     if (!s->write_memblock) {
1249         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1250         s->write_data = pa_memblock_acquire(s->write_memblock);
1251     }
1252
1253     *data = s->write_data;
1254     *nbytes = pa_memblock_get_length(s->write_memblock);
1255
1256     return 0;
1257 }
1258
1259 int pa_stream_cancel_write(
1260         pa_stream *s) {
1261
1262     pa_assert(s);
1263     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1264
1265     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1266     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1267     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1268     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1269
1270     pa_assert(s->write_data);
1271
1272     pa_memblock_release(s->write_memblock);
1273     pa_memblock_unref(s->write_memblock);
1274     s->write_memblock = NULL;
1275     s->write_data = NULL;
1276
1277     return 0;
1278 }
1279
1280 int pa_stream_write(
1281         pa_stream *s,
1282         const void *data,
1283         size_t length,
1284         pa_free_cb_t free_cb,
1285         int64_t offset,
1286         pa_seek_mode_t seek) {
1287
1288     pa_assert(s);
1289     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1290     pa_assert(data);
1291
1292     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1293     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1294     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1295     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1296     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1297     PA_CHECK_VALIDITY(s->context,
1298                       !s->write_memblock ||
1299                       ((data >= s->write_data) &&
1300                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1301                       PA_ERR_INVALID);
1302     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1303
1304     if (s->write_memblock) {
1305         pa_memchunk chunk;
1306
1307         /* pa_stream_write_begin() was called before */
1308
1309         pa_memblock_release(s->write_memblock);
1310
1311         chunk.memblock = s->write_memblock;
1312         chunk.index = (const char *) data - (const char *) s->write_data;
1313         chunk.length = length;
1314
1315         s->write_memblock = NULL;
1316         s->write_data = NULL;
1317
1318         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1319         pa_memblock_unref(chunk.memblock);
1320
1321     } else {
1322         pa_seek_mode_t t_seek = seek;
1323         int64_t t_offset = offset;
1324         size_t t_length = length;
1325         const void *t_data = data;
1326
1327         /* pa_stream_write_begin() was not called before */
1328
1329         while (t_length > 0) {
1330             pa_memchunk chunk;
1331
1332             chunk.index = 0;
1333
1334             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1335                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1336                 chunk.length = t_length;
1337             } else {
1338                 void *d;
1339
1340                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1341                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1342
1343                 d = pa_memblock_acquire(chunk.memblock);
1344                 memcpy(d, t_data, chunk.length);
1345                 pa_memblock_release(chunk.memblock);
1346             }
1347
1348             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1349
1350             t_offset = 0;
1351             t_seek = PA_SEEK_RELATIVE;
1352
1353             t_data = (const uint8_t*) t_data + chunk.length;
1354             t_length -= chunk.length;
1355
1356             pa_memblock_unref(chunk.memblock);
1357         }
1358
1359         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1360             free_cb((void*) data);
1361     }
1362
1363     /* This is obviously wrong since we ignore the seeking index . But
1364      * that's OK, the server side applies the same error */
1365     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1366
1367     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1368
1369     if (s->direction == PA_STREAM_PLAYBACK) {
1370
1371         /* Update latency request correction */
1372         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1373
1374             if (seek == PA_SEEK_ABSOLUTE) {
1375                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1376                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1377                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1378             } else if (seek == PA_SEEK_RELATIVE) {
1379                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1380                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1381             } else
1382                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1383         }
1384
1385         /* Update the write index in the already available latency data */
1386         if (s->timing_info_valid) {
1387
1388             if (seek == PA_SEEK_ABSOLUTE) {
1389                 s->timing_info.write_index_corrupt = FALSE;
1390                 s->timing_info.write_index = offset + (int64_t) length;
1391             } else if (seek == PA_SEEK_RELATIVE) {
1392                 if (!s->timing_info.write_index_corrupt)
1393                     s->timing_info.write_index += offset + (int64_t) length;
1394             } else
1395                 s->timing_info.write_index_corrupt = TRUE;
1396         }
1397
1398         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1399             request_auto_timing_update(s, TRUE);
1400     }
1401
1402     return 0;
1403 }
1404
1405 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1406     pa_assert(s);
1407     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1408     pa_assert(data);
1409     pa_assert(length);
1410
1411     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1412     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1413     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1414
1415     if (!s->peek_memchunk.memblock) {
1416
1417         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1418             *data = NULL;
1419             *length = 0;
1420             return 0;
1421         }
1422
1423         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1424     }
1425
1426     pa_assert(s->peek_data);
1427     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1428     *length = s->peek_memchunk.length;
1429     return 0;
1430 }
1431
1432 int pa_stream_drop(pa_stream *s) {
1433     pa_assert(s);
1434     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1435
1436     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1437     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1438     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1439     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1440
1441     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1442
1443     /* Fix the simulated local read index */
1444     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1445         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1446
1447     pa_assert(s->peek_data);
1448     pa_memblock_release(s->peek_memchunk.memblock);
1449     pa_memblock_unref(s->peek_memchunk.memblock);
1450     pa_memchunk_reset(&s->peek_memchunk);
1451
1452     return 0;
1453 }
1454
1455 size_t pa_stream_writable_size(pa_stream *s) {
1456     pa_assert(s);
1457     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1458
1459     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1460     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1461     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1462
1463     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1464 }
1465
1466 size_t pa_stream_readable_size(pa_stream *s) {
1467     pa_assert(s);
1468     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1469
1470     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1471     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1472     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1473
1474     return pa_memblockq_get_length(s->record_memblockq);
1475 }
1476
1477 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1478     pa_operation *o;
1479     pa_tagstruct *t;
1480     uint32_t tag;
1481
1482     pa_assert(s);
1483     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1484
1485     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1486     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1487     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1488
1489     /* Ask for a timing update before we cork/uncork to get the best
1490      * accuracy for the transport latency suitable for the
1491      * check_smoother_status() call in the started callback */
1492     request_auto_timing_update(s, TRUE);
1493
1494     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1495
1496     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1497     pa_tagstruct_putu32(t, s->channel);
1498     pa_pstream_send_tagstruct(s->context->pstream, t);
1499     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1500
1501     /* This might cause the read index to conitnue again, hence
1502      * let's request a timing update */
1503     request_auto_timing_update(s, TRUE);
1504
1505     return o;
1506 }
1507
1508 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1509     pa_usec_t usec;
1510
1511     pa_assert(s);
1512     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1513     pa_assert(s->state == PA_STREAM_READY);
1514     pa_assert(s->direction != PA_STREAM_UPLOAD);
1515     pa_assert(s->timing_info_valid);
1516     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1517     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1518
1519     if (s->direction == PA_STREAM_PLAYBACK) {
1520         /* The last byte that was written into the output device
1521          * had this time value associated */
1522         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1523
1524         if (!s->corked && !s->suspended) {
1525
1526             if (!ignore_transport)
1527                 /* Because the latency info took a little time to come
1528                  * to us, we assume that the real output time is actually
1529                  * a little ahead */
1530                 usec += s->timing_info.transport_usec;
1531
1532             /* However, the output device usually maintains a buffer
1533                too, hence the real sample currently played is a little
1534                back  */
1535             if (s->timing_info.sink_usec >= usec)
1536                 usec = 0;
1537             else
1538                 usec -= s->timing_info.sink_usec;
1539         }
1540
1541     } else {
1542         pa_assert(s->direction == PA_STREAM_RECORD);
1543
1544         /* The last byte written into the server side queue had
1545          * this time value associated */
1546         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1547
1548         if (!s->corked && !s->suspended) {
1549
1550             if (!ignore_transport)
1551                 /* Add transport latency */
1552                 usec += s->timing_info.transport_usec;
1553
1554             /* Add latency of data in device buffer */
1555             usec += s->timing_info.source_usec;
1556
1557             /* If this is a monitor source, we need to correct the
1558              * time by the playback device buffer */
1559             if (s->timing_info.sink_usec >= usec)
1560                 usec = 0;
1561             else
1562                 usec -= s->timing_info.sink_usec;
1563         }
1564     }
1565
1566     return usec;
1567 }
1568
1569 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1570     pa_operation *o = userdata;
1571     struct timeval local, remote, now;
1572     pa_timing_info *i;
1573     pa_bool_t playing = FALSE;
1574     uint64_t underrun_for = 0, playing_for = 0;
1575
1576     pa_assert(pd);
1577     pa_assert(o);
1578     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1579
1580     if (!o->context || !o->stream)
1581         goto finish;
1582
1583     i = &o->stream->timing_info;
1584
1585     o->stream->timing_info_valid = FALSE;
1586     i->write_index_corrupt = TRUE;
1587     i->read_index_corrupt = TRUE;
1588
1589     if (command != PA_COMMAND_REPLY) {
1590         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1591             goto finish;
1592
1593     } else {
1594
1595         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1596             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1597             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1598             pa_tagstruct_get_timeval(t, &local) < 0 ||
1599             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1600             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1601             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1602
1603             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1604             goto finish;
1605         }
1606
1607         if (o->context->version >= 13 &&
1608             o->stream->direction == PA_STREAM_PLAYBACK)
1609             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1610                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1611
1612                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1613                 goto finish;
1614             }
1615
1616
1617         if (!pa_tagstruct_eof(t)) {
1618             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1619             goto finish;
1620         }
1621         o->stream->timing_info_valid = TRUE;
1622         i->write_index_corrupt = FALSE;
1623         i->read_index_corrupt = FALSE;
1624
1625         i->playing = (int) playing;
1626         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1627
1628         pa_gettimeofday(&now);
1629
1630         /* Calculcate timestamps */
1631         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1632             /* local and remote seem to have synchronized clocks */
1633
1634             if (o->stream->direction == PA_STREAM_PLAYBACK)
1635                 i->transport_usec = pa_timeval_diff(&remote, &local);
1636             else
1637                 i->transport_usec = pa_timeval_diff(&now, &remote);
1638
1639             i->synchronized_clocks = TRUE;
1640             i->timestamp = remote;
1641         } else {
1642             /* clocks are not synchronized, let's estimate latency then */
1643             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1644             i->synchronized_clocks = FALSE;
1645             i->timestamp = local;
1646             pa_timeval_add(&i->timestamp, i->transport_usec);
1647         }
1648
1649         /* Invalidate read and write indexes if necessary */
1650         if (tag < o->stream->read_index_not_before)
1651             i->read_index_corrupt = TRUE;
1652
1653         if (tag < o->stream->write_index_not_before)
1654             i->write_index_corrupt = TRUE;
1655
1656         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1657             /* Write index correction */
1658
1659             int n, j;
1660             uint32_t ctag = tag;
1661
1662             /* Go through the saved correction values and add up the
1663              * total correction.*/
1664             for (n = 0, j = o->stream->current_write_index_correction+1;
1665                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1666                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1667
1668                 /* Step over invalid data or out-of-date data */
1669                 if (!o->stream->write_index_corrections[j].valid ||
1670                     o->stream->write_index_corrections[j].tag < ctag)
1671                     continue;
1672
1673                 /* Make sure that everything is in order */
1674                 ctag = o->stream->write_index_corrections[j].tag+1;
1675
1676                 /* Now fix the write index */
1677                 if (o->stream->write_index_corrections[j].corrupt) {
1678                     /* A corrupting seek was made */
1679                     i->write_index_corrupt = TRUE;
1680                 } else if (o->stream->write_index_corrections[j].absolute) {
1681                     /* An absolute seek was made */
1682                     i->write_index = o->stream->write_index_corrections[j].value;
1683                     i->write_index_corrupt = FALSE;
1684                 } else if (!i->write_index_corrupt) {
1685                     /* A relative seek was made */
1686                     i->write_index += o->stream->write_index_corrections[j].value;
1687                 }
1688             }
1689
1690             /* Clear old correction entries */
1691             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1692                 if (!o->stream->write_index_corrections[n].valid)
1693                     continue;
1694
1695                 if (o->stream->write_index_corrections[n].tag <= tag)
1696                     o->stream->write_index_corrections[n].valid = FALSE;
1697             }
1698         }
1699
1700         if (o->stream->direction == PA_STREAM_RECORD) {
1701             /* Read index correction */
1702
1703             if (!i->read_index_corrupt)
1704                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1705         }
1706
1707         /* Update smoother */
1708         if (o->stream->smoother) {
1709             pa_usec_t u, x;
1710
1711             u = x = pa_rtclock_now() - i->transport_usec;
1712
1713             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1714                 pa_usec_t su;
1715
1716                 /* If we weren't playing then it will take some time
1717                  * until the audio will actually come out through the
1718                  * speakers. Since we follow that timing here, we need
1719                  * to try to fix this up */
1720
1721                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1722
1723                 if (su < i->sink_usec)
1724                     x += i->sink_usec - su;
1725             }
1726
1727             if (!i->playing)
1728                 pa_smoother_pause(o->stream->smoother, x);
1729
1730             /* Update the smoother */
1731             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1732                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1733                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1734
1735             if (i->playing)
1736                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1737         }
1738     }
1739
1740     o->stream->auto_timing_update_requested = FALSE;
1741
1742     if (o->stream->latency_update_callback)
1743         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1744
1745     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1746         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1747         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1748     }
1749
1750 finish:
1751
1752     pa_operation_done(o);
1753     pa_operation_unref(o);
1754 }
1755
1756 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1757     uint32_t tag;
1758     pa_operation *o;
1759     pa_tagstruct *t;
1760     struct timeval now;
1761     int cidx = 0;
1762
1763     pa_assert(s);
1764     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1765
1766     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1767     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1768     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1769
1770     if (s->direction == PA_STREAM_PLAYBACK) {
1771         /* Find a place to store the write_index correction data for this entry */
1772         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1773
1774         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1775         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1776     }
1777     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1778
1779     t = pa_tagstruct_command(
1780             s->context,
1781             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1782             &tag);
1783     pa_tagstruct_putu32(t, s->channel);
1784     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1785
1786     pa_pstream_send_tagstruct(s->context->pstream, t);
1787     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1788
1789     if (s->direction == PA_STREAM_PLAYBACK) {
1790         /* Fill in initial correction data */
1791
1792         s->current_write_index_correction = cidx;
1793
1794         s->write_index_corrections[cidx].valid = TRUE;
1795         s->write_index_corrections[cidx].absolute = FALSE;
1796         s->write_index_corrections[cidx].corrupt = FALSE;
1797         s->write_index_corrections[cidx].tag = tag;
1798         s->write_index_corrections[cidx].value = 0;
1799     }
1800
1801     return o;
1802 }
1803
1804 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1805     pa_stream *s = userdata;
1806
1807     pa_assert(pd);
1808     pa_assert(s);
1809     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1810
1811     pa_stream_ref(s);
1812
1813     if (command != PA_COMMAND_REPLY) {
1814         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1815             goto finish;
1816
1817         pa_stream_set_state(s, PA_STREAM_FAILED);
1818         goto finish;
1819     } else if (!pa_tagstruct_eof(t)) {
1820         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1821         goto finish;
1822     }
1823
1824     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1825
1826 finish:
1827     pa_stream_unref(s);
1828 }
1829
1830 int pa_stream_disconnect(pa_stream *s) {
1831     pa_tagstruct *t;
1832     uint32_t tag;
1833
1834     pa_assert(s);
1835     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1836
1837     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1838     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1839     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1840
1841     pa_stream_ref(s);
1842
1843     t = pa_tagstruct_command(
1844             s->context,
1845             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1846                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1847             &tag);
1848     pa_tagstruct_putu32(t, s->channel);
1849     pa_pstream_send_tagstruct(s->context->pstream, t);
1850     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1851
1852     pa_stream_unref(s);
1853     return 0;
1854 }
1855
1856 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1857     pa_assert(s);
1858     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1859
1860     if (pa_detect_fork())
1861         return;
1862
1863     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1864         return;
1865
1866     s->read_callback = cb;
1867     s->read_userdata = userdata;
1868 }
1869
1870 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1871     pa_assert(s);
1872     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1873
1874     if (pa_detect_fork())
1875         return;
1876
1877     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1878         return;
1879
1880     s->write_callback = cb;
1881     s->write_userdata = userdata;
1882 }
1883
1884 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1885     pa_assert(s);
1886     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1887
1888     if (pa_detect_fork())
1889         return;
1890
1891     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1892         return;
1893
1894     s->state_callback = cb;
1895     s->state_userdata = userdata;
1896 }
1897
1898 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1899     pa_assert(s);
1900     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1901
1902     if (pa_detect_fork())
1903         return;
1904
1905     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1906         return;
1907
1908     s->overflow_callback = cb;
1909     s->overflow_userdata = userdata;
1910 }
1911
1912 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1913     pa_assert(s);
1914     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1915
1916     if (pa_detect_fork())
1917         return;
1918
1919     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1920         return;
1921
1922     s->underflow_callback = cb;
1923     s->underflow_userdata = userdata;
1924 }
1925
1926 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1927     pa_assert(s);
1928     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1929
1930     if (pa_detect_fork())
1931         return;
1932
1933     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1934         return;
1935
1936     s->latency_update_callback = cb;
1937     s->latency_update_userdata = userdata;
1938 }
1939
1940 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1941     pa_assert(s);
1942     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1943
1944     if (pa_detect_fork())
1945         return;
1946
1947     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1948         return;
1949
1950     s->moved_callback = cb;
1951     s->moved_userdata = userdata;
1952 }
1953
1954 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1955     pa_assert(s);
1956     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1957
1958     if (pa_detect_fork())
1959         return;
1960
1961     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1962         return;
1963
1964     s->suspended_callback = cb;
1965     s->suspended_userdata = userdata;
1966 }
1967
1968 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1969     pa_assert(s);
1970     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1971
1972     if (pa_detect_fork())
1973         return;
1974
1975     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1976         return;
1977
1978     s->started_callback = cb;
1979     s->started_userdata = userdata;
1980 }
1981
1982 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1983     pa_assert(s);
1984     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1985
1986     if (pa_detect_fork())
1987         return;
1988
1989     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1990         return;
1991
1992     s->event_callback = cb;
1993     s->event_userdata = userdata;
1994 }
1995
1996 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1997     pa_assert(s);
1998     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1999
2000     if (pa_detect_fork())
2001         return;
2002
2003     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2004         return;
2005
2006     s->buffer_attr_callback = cb;
2007     s->buffer_attr_userdata = userdata;
2008 }
2009
2010 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2011     pa_operation *o = userdata;
2012     int success = 1;
2013
2014     pa_assert(pd);
2015     pa_assert(o);
2016     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2017
2018     if (!o->context)
2019         goto finish;
2020
2021     if (command != PA_COMMAND_REPLY) {
2022         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2023             goto finish;
2024
2025         success = 0;
2026     } else if (!pa_tagstruct_eof(t)) {
2027         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2028         goto finish;
2029     }
2030
2031     if (o->callback) {
2032         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2033         cb(o->stream, success, o->userdata);
2034     }
2035
2036 finish:
2037     pa_operation_done(o);
2038     pa_operation_unref(o);
2039 }
2040
2041 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2042     pa_operation *o;
2043     pa_tagstruct *t;
2044     uint32_t tag;
2045
2046     pa_assert(s);
2047     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2048
2049     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2050     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2051     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2052
2053     /* Ask for a timing update before we cork/uncork to get the best
2054      * accuracy for the transport latency suitable for the
2055      * check_smoother_status() call in the started callback */
2056     request_auto_timing_update(s, TRUE);
2057
2058     s->corked = b;
2059
2060     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2061
2062     t = pa_tagstruct_command(
2063             s->context,
2064             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2065             &tag);
2066     pa_tagstruct_putu32(t, s->channel);
2067     pa_tagstruct_put_boolean(t, !!b);
2068     pa_pstream_send_tagstruct(s->context->pstream, t);
2069     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2070
2071     check_smoother_status(s, FALSE, FALSE, FALSE);
2072
2073     /* This might cause the indexes to hang/start again, hence let's
2074      * request a timing update, after the cork/uncork, too */
2075     request_auto_timing_update(s, TRUE);
2076
2077     return o;
2078 }
2079
2080 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2081     pa_tagstruct *t;
2082     pa_operation *o;
2083     uint32_t tag;
2084
2085     pa_assert(s);
2086     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2087
2088     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2089     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2090
2091     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2092
2093     t = pa_tagstruct_command(s->context, command, &tag);
2094     pa_tagstruct_putu32(t, s->channel);
2095     pa_pstream_send_tagstruct(s->context->pstream, t);
2096     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2097
2098     return o;
2099 }
2100
2101 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2102     pa_operation *o;
2103
2104     pa_assert(s);
2105     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2106
2107     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2108     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2109     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2110
2111     /* Ask for a timing update *before* the flush, so that the
2112      * transport usec is as up to date as possible when we get the
2113      * underflow message and update the smoother status*/
2114     request_auto_timing_update(s, TRUE);
2115
2116     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2117         return NULL;
2118
2119     if (s->direction == PA_STREAM_PLAYBACK) {
2120
2121         if (s->write_index_corrections[s->current_write_index_correction].valid)
2122             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2123
2124         if (s->buffer_attr.prebuf > 0)
2125             check_smoother_status(s, FALSE, FALSE, TRUE);
2126
2127         /* This will change the write index, but leave the
2128          * read index untouched. */
2129         invalidate_indexes(s, FALSE, TRUE);
2130
2131     } else
2132         /* For record streams this has no influence on the write
2133          * index, but the read index might jump. */
2134         invalidate_indexes(s, TRUE, FALSE);
2135
2136     return o;
2137 }
2138
2139 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2140     pa_operation *o;
2141
2142     pa_assert(s);
2143     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2144
2145     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2146     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2147     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2148     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2149
2150     /* Ask for a timing update before we cork/uncork to get the best
2151      * accuracy for the transport latency suitable for the
2152      * check_smoother_status() call in the started callback */
2153     request_auto_timing_update(s, TRUE);
2154
2155     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2156         return NULL;
2157
2158     /* This might cause the read index to hang again, hence
2159      * let's request a timing update */
2160     request_auto_timing_update(s, TRUE);
2161
2162     return o;
2163 }
2164
2165 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2166     pa_operation *o;
2167
2168     pa_assert(s);
2169     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2170
2171     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2172     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2173     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2174     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2175
2176     /* Ask for a timing update before we cork/uncork to get the best
2177      * accuracy for the transport latency suitable for the
2178      * check_smoother_status() call in the started callback */
2179     request_auto_timing_update(s, TRUE);
2180
2181     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2182         return NULL;
2183
2184     /* This might cause the read index to start moving again, hence
2185      * let's request a timing update */
2186     request_auto_timing_update(s, TRUE);
2187
2188     return o;
2189 }
2190
2191 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2192     pa_operation *o;
2193
2194     pa_assert(s);
2195     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2196     pa_assert(name);
2197
2198     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2199     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2200     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2201
2202     if (s->context->version >= 13) {
2203         pa_proplist *p = pa_proplist_new();
2204
2205         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2206         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2207         pa_proplist_free(p);
2208     } else {
2209         pa_tagstruct *t;
2210         uint32_t tag;
2211
2212         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2213         t = pa_tagstruct_command(
2214                 s->context,
2215                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2216                 &tag);
2217         pa_tagstruct_putu32(t, s->channel);
2218         pa_tagstruct_puts(t, name);
2219         pa_pstream_send_tagstruct(s->context->pstream, t);
2220         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2221     }
2222
2223     return o;
2224 }
2225
2226 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2227     pa_usec_t usec;
2228
2229     pa_assert(s);
2230     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2231
2232     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2233     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2234     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2235     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2236     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2237     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2238
2239     if (s->smoother)
2240         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2241     else
2242         usec = calc_time(s, FALSE);
2243
2244     /* Make sure the time runs monotonically */
2245     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2246         if (usec < s->previous_time)
2247             usec = s->previous_time;
2248         else
2249             s->previous_time = usec;
2250     }
2251
2252     if (r_usec)
2253         *r_usec = usec;
2254
2255     return 0;
2256 }
2257
2258 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2259     pa_assert(s);
2260     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2261
2262     if (negative)
2263         *negative = 0;
2264
2265     if (a >= b)
2266         return a-b;
2267     else {
2268         if (negative && s->direction == PA_STREAM_RECORD) {
2269             *negative = 1;
2270             return b-a;
2271         } else
2272             return 0;
2273     }
2274 }
2275
2276 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2277     pa_usec_t t, c;
2278     int r;
2279     int64_t cindex;
2280
2281     pa_assert(s);
2282     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2283     pa_assert(r_usec);
2284
2285     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2286     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2287     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2288     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2289     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2290     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2291
2292     if ((r = pa_stream_get_time(s, &t)) < 0)
2293         return r;
2294
2295     if (s->direction == PA_STREAM_PLAYBACK)
2296         cindex = s->timing_info.write_index;
2297     else
2298         cindex = s->timing_info.read_index;
2299
2300     if (cindex < 0)
2301         cindex = 0;
2302
2303     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2304
2305     if (s->direction == PA_STREAM_PLAYBACK)
2306         *r_usec = time_counter_diff(s, c, t, negative);
2307     else
2308         *r_usec = time_counter_diff(s, t, c, negative);
2309
2310     return 0;
2311 }
2312
2313 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2314     pa_assert(s);
2315     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2316
2317     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2318     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2319     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2320     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2321
2322     return &s->timing_info;
2323 }
2324
2325 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2326     pa_assert(s);
2327     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2328
2329     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2330
2331     return &s->sample_spec;
2332 }
2333
2334 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2335     pa_assert(s);
2336     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2337
2338     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2339
2340     return &s->channel_map;
2341 }
2342
2343 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2344     pa_assert(s);
2345     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2346
2347     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2348     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2349     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2350
2351     return &s->buffer_attr;
2352 }
2353
2354 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2355     pa_operation *o = userdata;
2356     int success = 1;
2357
2358     pa_assert(pd);
2359     pa_assert(o);
2360     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2361
2362     if (!o->context)
2363         goto finish;
2364
2365     if (command != PA_COMMAND_REPLY) {
2366         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2367             goto finish;
2368
2369         success = 0;
2370     } else {
2371         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2372             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2373                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2374                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2375                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2376                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2377                 goto finish;
2378             }
2379         } else if (o->stream->direction == PA_STREAM_RECORD) {
2380             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2381                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2382                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2383                 goto finish;
2384             }
2385         }
2386
2387         if (o->stream->context->version >= 13) {
2388             pa_usec_t usec;
2389
2390             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2391                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2392                 goto finish;
2393             }
2394
2395             if (o->stream->direction == PA_STREAM_RECORD)
2396                 o->stream->timing_info.configured_source_usec = usec;
2397             else
2398                 o->stream->timing_info.configured_sink_usec = usec;
2399         }
2400
2401         if (!pa_tagstruct_eof(t)) {
2402             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2403             goto finish;
2404         }
2405     }
2406
2407     if (o->callback) {
2408         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2409         cb(o->stream, success, o->userdata);
2410     }
2411
2412 finish:
2413     pa_operation_done(o);
2414     pa_operation_unref(o);
2415 }
2416
2417
2418 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2419     pa_operation *o;
2420     pa_tagstruct *t;
2421     uint32_t tag;
2422
2423     pa_assert(s);
2424     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2425     pa_assert(attr);
2426
2427     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2428     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2429     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2430     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2431
2432     /* Ask for a timing update before we cork/uncork to get the best
2433      * accuracy for the transport latency suitable for the
2434      * check_smoother_status() call in the started callback */
2435     request_auto_timing_update(s, TRUE);
2436
2437     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2438
2439     t = pa_tagstruct_command(
2440             s->context,
2441             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2442             &tag);
2443     pa_tagstruct_putu32(t, s->channel);
2444
2445     pa_tagstruct_putu32(t, attr->maxlength);
2446
2447     if (s->direction == PA_STREAM_PLAYBACK)
2448         pa_tagstruct_put(
2449                 t,
2450                 PA_TAG_U32, attr->tlength,
2451                 PA_TAG_U32, attr->prebuf,
2452                 PA_TAG_U32, attr->minreq,
2453                 PA_TAG_INVALID);
2454     else
2455         pa_tagstruct_putu32(t, attr->fragsize);
2456
2457     if (s->context->version >= 13)
2458         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2459
2460     if (s->context->version >= 14)
2461         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2462
2463     pa_pstream_send_tagstruct(s->context->pstream, t);
2464     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2465
2466     /* This might cause changes in the read/write indexex, hence let's
2467      * request a timing update */
2468     request_auto_timing_update(s, TRUE);
2469
2470     return o;
2471 }
2472
2473 uint32_t pa_stream_get_device_index(pa_stream *s) {
2474     pa_assert(s);
2475     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2476
2477     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2478     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2479     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2480     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2481     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2482
2483     return s->device_index;
2484 }
2485
2486 const char *pa_stream_get_device_name(pa_stream *s) {
2487     pa_assert(s);
2488     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2489
2490     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2491     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2492     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2493     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2494     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2495
2496     return s->device_name;
2497 }
2498
2499 int pa_stream_is_suspended(pa_stream *s) {
2500     pa_assert(s);
2501     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2502
2503     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2504     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2505     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2506     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2507
2508     return s->suspended;
2509 }
2510
2511 int pa_stream_is_corked(pa_stream *s) {
2512     pa_assert(s);
2513     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2514
2515     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2516     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2517     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2518
2519     return s->corked;
2520 }
2521
2522 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2523     pa_operation *o = userdata;
2524     int success = 1;
2525
2526     pa_assert(pd);
2527     pa_assert(o);
2528     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2529
2530     if (!o->context)
2531         goto finish;
2532
2533     if (command != PA_COMMAND_REPLY) {
2534         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2535             goto finish;
2536
2537         success = 0;
2538     } else {
2539
2540         if (!pa_tagstruct_eof(t)) {
2541             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2542             goto finish;
2543         }
2544     }
2545
2546     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2547     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2548
2549     if (o->callback) {
2550         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2551         cb(o->stream, success, o->userdata);
2552     }
2553
2554 finish:
2555     pa_operation_done(o);
2556     pa_operation_unref(o);
2557 }
2558
2559
2560 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2561     pa_operation *o;
2562     pa_tagstruct *t;
2563     uint32_t tag;
2564
2565     pa_assert(s);
2566     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2567
2568     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2569     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2570     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2571     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2572     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2573     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2574
2575     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2576     o->private = PA_UINT_TO_PTR(rate);
2577
2578     t = pa_tagstruct_command(
2579             s->context,
2580             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2581             &tag);
2582     pa_tagstruct_putu32(t, s->channel);
2583     pa_tagstruct_putu32(t, rate);
2584
2585     pa_pstream_send_tagstruct(s->context->pstream, t);
2586     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2587
2588     return o;
2589 }
2590
2591 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2592     pa_operation *o;
2593     pa_tagstruct *t;
2594     uint32_t tag;
2595
2596     pa_assert(s);
2597     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2598
2599     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2600     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2601     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2602     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2603     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2604
2605     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2606
2607     t = pa_tagstruct_command(
2608             s->context,
2609             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2610             &tag);
2611     pa_tagstruct_putu32(t, s->channel);
2612     pa_tagstruct_putu32(t, (uint32_t) mode);
2613     pa_tagstruct_put_proplist(t, p);
2614
2615     pa_pstream_send_tagstruct(s->context->pstream, t);
2616     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2617
2618     /* Please note that we don't update s->proplist here, because we
2619      * don't export that field */
2620
2621     return o;
2622 }
2623
2624 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2625     pa_operation *o;
2626     pa_tagstruct *t;
2627     uint32_t tag;
2628     const char * const*k;
2629
2630     pa_assert(s);
2631     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2632
2633     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2634     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2635     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2636     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2637     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2638
2639     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2640
2641     t = pa_tagstruct_command(
2642             s->context,
2643             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2644             &tag);
2645     pa_tagstruct_putu32(t, s->channel);
2646
2647     for (k = keys; *k; k++)
2648         pa_tagstruct_puts(t, *k);
2649
2650     pa_tagstruct_puts(t, NULL);
2651
2652     pa_pstream_send_tagstruct(s->context->pstream, t);
2653     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2654
2655     /* Please note that we don't update s->proplist here, because we
2656      * don't export that field */
2657
2658     return o;
2659 }
2660
2661 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2662     pa_assert(s);
2663     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2664
2665     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2666     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2667     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2668     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2669
2670     s->direct_on_input = sink_input_idx;
2671
2672     return 0;
2673 }
2674
2675 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2676     pa_assert(s);
2677     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2678
2679     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2680     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2681     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2682
2683     return s->direct_on_input;
2684 }