Remove libatomic_ops dependency
[profile/ivi/pulseaudio-panda.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_format_info * const *formats,
89         unsigned int n_formats,
90         pa_proplist *p) {
91
92     pa_stream *s;
93     unsigned int i;
94
95     pa_assert(c);
96     pa_assert(PA_REFCNT_VALUE(c) >= 1);
97     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98     pa_assert(n_formats < PA_MAX_FORMATS);
99
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     s = pa_xnew(pa_stream, 1);
104     PA_REFCNT_INIT(s);
105     s->context = c;
106     s->mainloop = c->mainloop;
107
108     s->direction = PA_STREAM_NODIRECTION;
109     s->state = PA_STREAM_UNCONNECTED;
110     s->flags = 0;
111
112     if (ss)
113         s->sample_spec = *ss;
114     else
115         pa_sample_spec_init(&s->sample_spec);
116
117     if (map)
118         s->channel_map = *map;
119     else
120         pa_channel_map_init(&s->channel_map);
121
122     s->n_formats = 0;
123     if (formats) {
124         s->n_formats = n_formats;
125         for (i = 0; i < n_formats; i++)
126             s->req_formats[i] = pa_format_info_copy(formats[i]);
127     }
128
129     /* We'll get the final negotiated format after connecting */
130     s->format = NULL;
131
132     s->direct_on_input = PA_INVALID_INDEX;
133
134     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135     if (name)
136         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138     s->channel = 0;
139     s->channel_valid = FALSE;
140     s->syncid = c->csyncid++;
141     s->stream_index = PA_INVALID_INDEX;
142
143     s->requested_bytes = 0;
144     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146     /* We initialize the target length here, so that if the user
147      * passes no explicit buffering metrics the default is similar to
148      * what older PA versions provided. */
149
150     s->buffer_attr.maxlength = (uint32_t) -1;
151     if (ss)
152         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153     else {
154         /* FIXME: We assume a worst-case compressed format corresponding to
155          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156         pa_sample_spec tmp_ss = {
157             .format   = PA_SAMPLE_S16NE,
158             .rate     = 48000,
159             .channels = 2,
160         };
161         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162     }
163     s->buffer_attr.minreq = (uint32_t) -1;
164     s->buffer_attr.prebuf = (uint32_t) -1;
165     s->buffer_attr.fragsize = (uint32_t) -1;
166
167     s->device_index = PA_INVALID_INDEX;
168     s->device_name = NULL;
169     s->suspended = FALSE;
170     s->corked = FALSE;
171
172     s->write_memblock = NULL;
173     s->write_data = NULL;
174
175     pa_memchunk_reset(&s->peek_memchunk);
176     s->peek_data = NULL;
177     s->record_memblockq = NULL;
178
179     memset(&s->timing_info, 0, sizeof(s->timing_info));
180     s->timing_info_valid = FALSE;
181
182     s->previous_time = 0;
183     s->latest_underrun_at_index = -1;
184
185     s->read_index_not_before = 0;
186     s->write_index_not_before = 0;
187     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188         s->write_index_corrections[i].valid = 0;
189     s->current_write_index_correction = 0;
190
191     s->auto_timing_update_event = NULL;
192     s->auto_timing_update_requested = FALSE;
193     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195     reset_callbacks(s);
196
197     s->smoother = NULL;
198
199     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200     PA_LLIST_PREPEND(pa_stream, c->streams, s);
201     pa_stream_ref(s);
202
203     return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207         pa_context *c,
208         const char *name,
209         const pa_sample_spec *ss,
210         const pa_channel_map *map,
211         pa_proplist *p) {
212
213     pa_channel_map tmap;
214
215     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221     if (!map)
222         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228         pa_context *c,
229         const char *name,
230         pa_format_info * const *formats,
231         unsigned int n_formats,
232         pa_proplist *p) {
233
234     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235
236     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
237 }
238
239 static void stream_unlink(pa_stream *s) {
240     pa_operation *o, *n;
241     pa_assert(s);
242
243     if (!s->context)
244         return;
245
246     /* Detach from context */
247
248     /* Unref all operation objects that point to us */
249     for (o = s->context->operations; o; o = n) {
250         n = o->next;
251
252         if (o->stream == s)
253             pa_operation_cancel(o);
254     }
255
256     /* Drop all outstanding replies for this stream */
257     if (s->context->pdispatch)
258         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259
260     if (s->channel_valid) {
261         pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
262         s->channel = 0;
263         s->channel_valid = FALSE;
264     }
265
266     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
267     pa_stream_unref(s);
268
269     s->context = NULL;
270
271     if (s->auto_timing_update_event) {
272         pa_assert(s->mainloop);
273         s->mainloop->time_free(s->auto_timing_update_event);
274     }
275
276     reset_callbacks(s);
277 }
278
279 static void stream_free(pa_stream *s) {
280     unsigned int i;
281
282     pa_assert(s);
283
284     stream_unlink(s);
285
286     if (s->write_memblock) {
287         if (s->write_data)
288             pa_memblock_release(s->write_memblock);
289         pa_memblock_unref(s->write_memblock);
290     }
291
292     if (s->peek_memchunk.memblock) {
293         if (s->peek_data)
294             pa_memblock_release(s->peek_memchunk.memblock);
295         pa_memblock_unref(s->peek_memchunk.memblock);
296     }
297
298     if (s->record_memblockq)
299         pa_memblockq_free(s->record_memblockq);
300
301     if (s->proplist)
302         pa_proplist_free(s->proplist);
303
304     if (s->smoother)
305         pa_smoother_free(s->smoother);
306
307     for (i = 0; i < s->n_formats; i++)
308         pa_format_info_free(s->req_formats[i]);
309
310     if (s->format)
311         pa_format_info_free(s->format);
312
313     pa_xfree(s->device_name);
314     pa_xfree(s);
315 }
316
317 void pa_stream_unref(pa_stream *s) {
318     pa_assert(s);
319     pa_assert(PA_REFCNT_VALUE(s) >= 1);
320
321     if (PA_REFCNT_DEC(s) <= 0)
322         stream_free(s);
323 }
324
325 pa_stream* pa_stream_ref(pa_stream *s) {
326     pa_assert(s);
327     pa_assert(PA_REFCNT_VALUE(s) >= 1);
328
329     PA_REFCNT_INC(s);
330     return s;
331 }
332
333 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
334     pa_assert(s);
335     pa_assert(PA_REFCNT_VALUE(s) >= 1);
336
337     return s->state;
338 }
339
340 pa_context* pa_stream_get_context(pa_stream *s) {
341     pa_assert(s);
342     pa_assert(PA_REFCNT_VALUE(s) >= 1);
343
344     return s->context;
345 }
346
347 uint32_t pa_stream_get_index(pa_stream *s) {
348     pa_assert(s);
349     pa_assert(PA_REFCNT_VALUE(s) >= 1);
350
351     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
352     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
353
354     return s->stream_index;
355 }
356
357 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
358     pa_assert(s);
359     pa_assert(PA_REFCNT_VALUE(s) >= 1);
360
361     if (s->state == st)
362         return;
363
364     pa_stream_ref(s);
365
366     s->state = st;
367
368     if (s->state_callback)
369         s->state_callback(s, s->state_userdata);
370
371     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
372         stream_unlink(s);
373
374     pa_stream_unref(s);
375 }
376
377 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
378     pa_assert(s);
379     pa_assert(PA_REFCNT_VALUE(s) >= 1);
380
381     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
382         return;
383
384     if (s->state == PA_STREAM_READY &&
385         (force || !s->auto_timing_update_requested)) {
386         pa_operation *o;
387
388 /*         pa_log("Automatically requesting new timing data"); */
389
390         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
391             pa_operation_unref(o);
392             s->auto_timing_update_requested = TRUE;
393         }
394     }
395
396     if (s->auto_timing_update_event) {
397         if (s->suspended && !force) {
398             pa_assert(s->mainloop);
399             s->mainloop->time_free(s->auto_timing_update_event);
400             s->auto_timing_update_event = NULL;
401         } else {
402             if (force)
403                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
404
405             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
406
407             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
408         }
409     }
410 }
411
412 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
413     pa_context *c = userdata;
414     pa_stream *s;
415     uint32_t channel;
416
417     pa_assert(pd);
418     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
419     pa_assert(t);
420     pa_assert(c);
421     pa_assert(PA_REFCNT_VALUE(c) >= 1);
422
423     pa_context_ref(c);
424
425     if (pa_tagstruct_getu32(t, &channel) < 0 ||
426         !pa_tagstruct_eof(t)) {
427         pa_context_fail(c, PA_ERR_PROTOCOL);
428         goto finish;
429     }
430
431     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
432         goto finish;
433
434     if (s->state != PA_STREAM_READY)
435         goto finish;
436
437     pa_context_set_error(c, PA_ERR_KILLED);
438     pa_stream_set_state(s, PA_STREAM_FAILED);
439
440 finish:
441     pa_context_unref(c);
442 }
443
444 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
445     pa_usec_t x;
446
447     pa_assert(s);
448     pa_assert(!force_start || !force_stop);
449
450     if (!s->smoother)
451         return;
452
453     x = pa_rtclock_now();
454
455     if (s->timing_info_valid) {
456         if (aposteriori)
457             x -= s->timing_info.transport_usec;
458         else
459             x += s->timing_info.transport_usec;
460     }
461
462     if (s->suspended || s->corked || force_stop)
463         pa_smoother_pause(s->smoother, x);
464     else if (force_start || s->buffer_attr.prebuf == 0) {
465
466         if (!s->timing_info_valid &&
467             !aposteriori &&
468             !force_start &&
469             !force_stop &&
470             s->context->version >= 13) {
471
472             /* If the server supports STARTED events we take them as
473              * indications when audio really starts/stops playing, if
474              * we don't have any timing info yet -- instead of trying
475              * to be smart and guessing the server time. Otherwise the
476              * unknown transport delay adds too much noise to our time
477              * calculations. */
478
479             return;
480         }
481
482         pa_smoother_resume(s->smoother, x, TRUE);
483     }
484
485     /* Please note that we have no idea if playback actually started
486      * if prebuf is non-zero! */
487 }
488
489 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
490
491 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
492     pa_context *c = userdata;
493     pa_stream *s;
494     uint32_t channel;
495     const char *dn;
496     pa_bool_t suspended;
497     uint32_t di;
498     pa_usec_t usec = 0;
499     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
500
501     pa_assert(pd);
502     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
503     pa_assert(t);
504     pa_assert(c);
505     pa_assert(PA_REFCNT_VALUE(c) >= 1);
506
507     pa_context_ref(c);
508
509     if (c->version < 12) {
510         pa_context_fail(c, PA_ERR_PROTOCOL);
511         goto finish;
512     }
513
514     if (pa_tagstruct_getu32(t, &channel) < 0 ||
515         pa_tagstruct_getu32(t, &di) < 0 ||
516         pa_tagstruct_gets(t, &dn) < 0 ||
517         pa_tagstruct_get_boolean(t, &suspended) < 0) {
518         pa_context_fail(c, PA_ERR_PROTOCOL);
519         goto finish;
520     }
521
522     if (c->version >= 13) {
523
524         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
525             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
526                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
527                 pa_tagstruct_get_usec(t, &usec) < 0) {
528                 pa_context_fail(c, PA_ERR_PROTOCOL);
529                 goto finish;
530             }
531         } else {
532             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
533                 pa_tagstruct_getu32(t, &tlength) < 0 ||
534                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
535                 pa_tagstruct_getu32(t, &minreq) < 0 ||
536                 pa_tagstruct_get_usec(t, &usec) < 0) {
537                 pa_context_fail(c, PA_ERR_PROTOCOL);
538                 goto finish;
539             }
540         }
541     }
542
543     if (!pa_tagstruct_eof(t)) {
544         pa_context_fail(c, PA_ERR_PROTOCOL);
545         goto finish;
546     }
547
548     if (!dn || di == PA_INVALID_INDEX) {
549         pa_context_fail(c, PA_ERR_PROTOCOL);
550         goto finish;
551     }
552
553     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
554         goto finish;
555
556     if (s->state != PA_STREAM_READY)
557         goto finish;
558
559     if (c->version >= 13) {
560         if (s->direction == PA_STREAM_RECORD)
561             s->timing_info.configured_source_usec = usec;
562         else
563             s->timing_info.configured_sink_usec = usec;
564
565         s->buffer_attr.maxlength = maxlength;
566         s->buffer_attr.fragsize = fragsize;
567         s->buffer_attr.tlength = tlength;
568         s->buffer_attr.prebuf = prebuf;
569         s->buffer_attr.minreq = minreq;
570     }
571
572     pa_xfree(s->device_name);
573     s->device_name = pa_xstrdup(dn);
574     s->device_index = di;
575
576     s->suspended = suspended;
577
578     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
579         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
580         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
581         request_auto_timing_update(s, TRUE);
582     }
583
584     check_smoother_status(s, TRUE, FALSE, FALSE);
585     request_auto_timing_update(s, TRUE);
586
587     if (s->moved_callback)
588         s->moved_callback(s, s->moved_userdata);
589
590 finish:
591     pa_context_unref(c);
592 }
593
594 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
595     pa_context *c = userdata;
596     pa_stream *s;
597     uint32_t channel;
598     pa_usec_t usec = 0;
599     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
600
601     pa_assert(pd);
602     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
603     pa_assert(t);
604     pa_assert(c);
605     pa_assert(PA_REFCNT_VALUE(c) >= 1);
606
607     pa_context_ref(c);
608
609     if (c->version < 15) {
610         pa_context_fail(c, PA_ERR_PROTOCOL);
611         goto finish;
612     }
613
614     if (pa_tagstruct_getu32(t, &channel) < 0) {
615         pa_context_fail(c, PA_ERR_PROTOCOL);
616         goto finish;
617     }
618
619     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
620         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
621             pa_tagstruct_getu32(t, &fragsize) < 0 ||
622             pa_tagstruct_get_usec(t, &usec) < 0) {
623             pa_context_fail(c, PA_ERR_PROTOCOL);
624             goto finish;
625         }
626     } else {
627         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
628             pa_tagstruct_getu32(t, &tlength) < 0 ||
629             pa_tagstruct_getu32(t, &prebuf) < 0 ||
630             pa_tagstruct_getu32(t, &minreq) < 0 ||
631             pa_tagstruct_get_usec(t, &usec) < 0) {
632             pa_context_fail(c, PA_ERR_PROTOCOL);
633             goto finish;
634         }
635     }
636
637     if (!pa_tagstruct_eof(t)) {
638         pa_context_fail(c, PA_ERR_PROTOCOL);
639         goto finish;
640     }
641
642     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
643         goto finish;
644
645     if (s->state != PA_STREAM_READY)
646         goto finish;
647
648     if (s->direction == PA_STREAM_RECORD)
649         s->timing_info.configured_source_usec = usec;
650     else
651         s->timing_info.configured_sink_usec = usec;
652
653     s->buffer_attr.maxlength = maxlength;
654     s->buffer_attr.fragsize = fragsize;
655     s->buffer_attr.tlength = tlength;
656     s->buffer_attr.prebuf = prebuf;
657     s->buffer_attr.minreq = minreq;
658
659     request_auto_timing_update(s, TRUE);
660
661     if (s->buffer_attr_callback)
662         s->buffer_attr_callback(s, s->buffer_attr_userdata);
663
664 finish:
665     pa_context_unref(c);
666 }
667
668 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
669     pa_context *c = userdata;
670     pa_stream *s;
671     uint32_t channel;
672     pa_bool_t suspended;
673
674     pa_assert(pd);
675     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
676     pa_assert(t);
677     pa_assert(c);
678     pa_assert(PA_REFCNT_VALUE(c) >= 1);
679
680     pa_context_ref(c);
681
682     if (c->version < 12) {
683         pa_context_fail(c, PA_ERR_PROTOCOL);
684         goto finish;
685     }
686
687     if (pa_tagstruct_getu32(t, &channel) < 0 ||
688         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
689         !pa_tagstruct_eof(t)) {
690         pa_context_fail(c, PA_ERR_PROTOCOL);
691         goto finish;
692     }
693
694     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
695         goto finish;
696
697     if (s->state != PA_STREAM_READY)
698         goto finish;
699
700     s->suspended = suspended;
701
702     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
703         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
704         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
705         request_auto_timing_update(s, TRUE);
706     }
707
708     check_smoother_status(s, TRUE, FALSE, FALSE);
709     request_auto_timing_update(s, TRUE);
710
711     if (s->suspended_callback)
712         s->suspended_callback(s, s->suspended_userdata);
713
714 finish:
715     pa_context_unref(c);
716 }
717
718 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
719     pa_context *c = userdata;
720     pa_stream *s;
721     uint32_t channel;
722
723     pa_assert(pd);
724     pa_assert(command == PA_COMMAND_STARTED);
725     pa_assert(t);
726     pa_assert(c);
727     pa_assert(PA_REFCNT_VALUE(c) >= 1);
728
729     pa_context_ref(c);
730
731     if (c->version < 13) {
732         pa_context_fail(c, PA_ERR_PROTOCOL);
733         goto finish;
734     }
735
736     if (pa_tagstruct_getu32(t, &channel) < 0 ||
737         !pa_tagstruct_eof(t)) {
738         pa_context_fail(c, PA_ERR_PROTOCOL);
739         goto finish;
740     }
741
742     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
743         goto finish;
744
745     if (s->state != PA_STREAM_READY)
746         goto finish;
747
748     check_smoother_status(s, TRUE, TRUE, FALSE);
749     request_auto_timing_update(s, TRUE);
750
751     if (s->started_callback)
752         s->started_callback(s, s->started_userdata);
753
754 finish:
755     pa_context_unref(c);
756 }
757
758 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
759     pa_context *c = userdata;
760     pa_stream *s;
761     uint32_t channel;
762     pa_proplist *pl = NULL;
763     const char *event;
764
765     pa_assert(pd);
766     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
767     pa_assert(t);
768     pa_assert(c);
769     pa_assert(PA_REFCNT_VALUE(c) >= 1);
770
771     pa_context_ref(c);
772
773     if (c->version < 15) {
774         pa_context_fail(c, PA_ERR_PROTOCOL);
775         goto finish;
776     }
777
778     pl = pa_proplist_new();
779
780     if (pa_tagstruct_getu32(t, &channel) < 0 ||
781         pa_tagstruct_gets(t, &event) < 0 ||
782         pa_tagstruct_get_proplist(t, pl) < 0 ||
783         !pa_tagstruct_eof(t) || !event) {
784         pa_context_fail(c, PA_ERR_PROTOCOL);
785         goto finish;
786     }
787
788     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
789         goto finish;
790
791     if (s->state != PA_STREAM_READY)
792         goto finish;
793
794     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
795         /* Let client know what the running time was when the stream had to be killed  */
796         pa_usec_t stream_time;
797         if (pa_stream_get_time(s, &stream_time) == 0)
798             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
799     }
800
801     if (s->event_callback)
802         s->event_callback(s, event, pl, s->event_userdata);
803
804 finish:
805     pa_context_unref(c);
806
807     if (pl)
808         pa_proplist_free(pl);
809 }
810
811 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
812     pa_stream *s;
813     pa_context *c = userdata;
814     uint32_t bytes, channel;
815
816     pa_assert(pd);
817     pa_assert(command == PA_COMMAND_REQUEST);
818     pa_assert(t);
819     pa_assert(c);
820     pa_assert(PA_REFCNT_VALUE(c) >= 1);
821
822     pa_context_ref(c);
823
824     if (pa_tagstruct_getu32(t, &channel) < 0 ||
825         pa_tagstruct_getu32(t, &bytes) < 0 ||
826         !pa_tagstruct_eof(t)) {
827         pa_context_fail(c, PA_ERR_PROTOCOL);
828         goto finish;
829     }
830
831     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
832         goto finish;
833
834     if (s->state != PA_STREAM_READY)
835         goto finish;
836
837     s->requested_bytes += bytes;
838
839     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
840
841     if (s->requested_bytes > 0 && s->write_callback)
842         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
843
844 finish:
845     pa_context_unref(c);
846 }
847
848 int64_t pa_stream_get_underflow_index(pa_stream *p)
849 {
850     pa_assert(p);
851     return p->latest_underrun_at_index;
852 }
853
854 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
855     pa_stream *s;
856     pa_context *c = userdata;
857     uint32_t channel;
858     int64_t offset = -1;
859
860     pa_assert(pd);
861     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
862     pa_assert(t);
863     pa_assert(c);
864     pa_assert(PA_REFCNT_VALUE(c) >= 1);
865
866     pa_context_ref(c);
867
868     if (pa_tagstruct_getu32(t, &channel) < 0) {
869         pa_context_fail(c, PA_ERR_PROTOCOL);
870         goto finish;
871     }
872
873     if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
874         if (pa_tagstruct_gets64(t, &offset) < 0) {
875             pa_context_fail(c, PA_ERR_PROTOCOL);
876             goto finish;
877         }
878     }
879
880     if (!pa_tagstruct_eof(t)) {
881         pa_context_fail(c, PA_ERR_PROTOCOL);
882         goto finish;
883     }
884
885     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
886         goto finish;
887
888     if (s->state != PA_STREAM_READY)
889         goto finish;
890
891     if (offset != -1)
892         s->latest_underrun_at_index = offset;
893
894     if (s->buffer_attr.prebuf > 0)
895         check_smoother_status(s, TRUE, FALSE, TRUE);
896
897     request_auto_timing_update(s, TRUE);
898
899     if (command == PA_COMMAND_OVERFLOW) {
900         if (s->overflow_callback)
901             s->overflow_callback(s, s->overflow_userdata);
902     } else if (command == PA_COMMAND_UNDERFLOW) {
903         if (s->underflow_callback)
904             s->underflow_callback(s, s->underflow_userdata);
905     }
906
907 finish:
908     pa_context_unref(c);
909 }
910
911 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
912     pa_assert(s);
913     pa_assert(PA_REFCNT_VALUE(s) >= 1);
914
915 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
916
917     if (s->state != PA_STREAM_READY)
918         return;
919
920     if (w) {
921         s->write_index_not_before = s->context->ctag;
922
923         if (s->timing_info_valid)
924             s->timing_info.write_index_corrupt = TRUE;
925
926 /*         pa_log("write_index invalidated"); */
927     }
928
929     if (r) {
930         s->read_index_not_before = s->context->ctag;
931
932         if (s->timing_info_valid)
933             s->timing_info.read_index_corrupt = TRUE;
934
935 /*         pa_log("read_index invalidated"); */
936     }
937
938     request_auto_timing_update(s, TRUE);
939 }
940
941 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
942     pa_stream *s = userdata;
943
944     pa_assert(s);
945     pa_assert(PA_REFCNT_VALUE(s) >= 1);
946
947     pa_stream_ref(s);
948     request_auto_timing_update(s, FALSE);
949     pa_stream_unref(s);
950 }
951
952 static void create_stream_complete(pa_stream *s) {
953     pa_assert(s);
954     pa_assert(PA_REFCNT_VALUE(s) >= 1);
955     pa_assert(s->state == PA_STREAM_CREATING);
956
957     pa_stream_set_state(s, PA_STREAM_READY);
958
959     if (s->requested_bytes > 0 && s->write_callback)
960         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
961
962     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
963         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
964         pa_assert(!s->auto_timing_update_event);
965         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
966
967         request_auto_timing_update(s, TRUE);
968     }
969
970     check_smoother_status(s, TRUE, FALSE, FALSE);
971 }
972
973 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
974     const char *e;
975
976     pa_assert(s);
977     pa_assert(attr);
978
979     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
980         uint32_t ms;
981
982         if (pa_atou(e, &ms) < 0 || ms <= 0)
983             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
984         else {
985             attr->maxlength = (uint32_t) -1;
986             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
987             attr->minreq = (uint32_t) -1;
988             attr->prebuf = (uint32_t) -1;
989             attr->fragsize = attr->tlength;
990         }
991
992         if (flags)
993             *flags |= PA_STREAM_ADJUST_LATENCY;
994     }
995
996     if (s->context->version >= 13)
997         return;
998
999     /* Version older than 0.9.10 didn't do server side buffer_attr
1000      * selection, hence we have to fake it on the client side. */
1001
1002     /* We choose fairly conservative values here, to not confuse
1003      * old clients with extremely large playback buffers */
1004
1005     if (attr->maxlength == (uint32_t) -1)
1006         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1007
1008     if (attr->tlength == (uint32_t) -1)
1009         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1010
1011     if (attr->minreq == (uint32_t) -1)
1012         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1013
1014     if (attr->prebuf == (uint32_t) -1)
1015         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1016
1017     if (attr->fragsize == (uint32_t) -1)
1018         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1019 }
1020
1021 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1022     pa_stream *s = userdata;
1023     uint32_t requested_bytes = 0;
1024
1025     pa_assert(pd);
1026     pa_assert(s);
1027     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1028     pa_assert(s->state == PA_STREAM_CREATING);
1029
1030     pa_stream_ref(s);
1031
1032     if (command != PA_COMMAND_REPLY) {
1033         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1034             goto finish;
1035
1036         pa_stream_set_state(s, PA_STREAM_FAILED);
1037         goto finish;
1038     }
1039
1040     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1041         s->channel == PA_INVALID_INDEX ||
1042         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1043         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1044         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1045         goto finish;
1046     }
1047
1048     s->requested_bytes = (int64_t) requested_bytes;
1049
1050     if (s->context->version >= 9) {
1051         if (s->direction == PA_STREAM_PLAYBACK) {
1052             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1053                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1054                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1055                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1056                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1057                 goto finish;
1058             }
1059         } else if (s->direction == PA_STREAM_RECORD) {
1060             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1061                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1062                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1063                 goto finish;
1064             }
1065         }
1066     }
1067
1068     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1069         pa_sample_spec ss;
1070         pa_channel_map cm;
1071         const char *dn = NULL;
1072         pa_bool_t suspended;
1073
1074         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1075             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1076             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1077             pa_tagstruct_gets(t, &dn) < 0 ||
1078             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1079             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1080             goto finish;
1081         }
1082
1083         if (!dn || s->device_index == PA_INVALID_INDEX ||
1084             ss.channels != cm.channels ||
1085             !pa_channel_map_valid(&cm) ||
1086             !pa_sample_spec_valid(&ss) ||
1087             (s->n_formats == 0 && (
1088                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1089                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1090                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1091             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1092             goto finish;
1093         }
1094
1095         pa_xfree(s->device_name);
1096         s->device_name = pa_xstrdup(dn);
1097         s->suspended = suspended;
1098
1099         s->channel_map = cm;
1100         s->sample_spec = ss;
1101     }
1102
1103     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1104         pa_usec_t usec;
1105
1106         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1107             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1108             goto finish;
1109         }
1110
1111         if (s->direction == PA_STREAM_RECORD)
1112             s->timing_info.configured_source_usec = usec;
1113         else
1114             s->timing_info.configured_sink_usec = usec;
1115     }
1116
1117     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1118         || s->context->version >= 22) {
1119
1120         pa_format_info *f = pa_format_info_new();
1121         pa_tagstruct_get_format_info(t, f);
1122
1123         if (pa_format_info_valid(f))
1124             s->format = f;
1125         else {
1126             pa_format_info_free(f);
1127             if (s->n_formats > 0) {
1128                 /* We used the extended API, so we should have got back a proper format */
1129                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1130                 goto finish;
1131             }
1132         }
1133     }
1134
1135     if (!pa_tagstruct_eof(t)) {
1136         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1137         goto finish;
1138     }
1139
1140     if (s->direction == PA_STREAM_RECORD) {
1141         pa_assert(!s->record_memblockq);
1142
1143         s->record_memblockq = pa_memblockq_new(
1144                 "client side record memblockq",
1145                 0,
1146                 s->buffer_attr.maxlength,
1147                 0,
1148                 &s->sample_spec,
1149                 1,
1150                 0,
1151                 0,
1152                 NULL);
1153     }
1154
1155     s->channel_valid = TRUE;
1156     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1157
1158     create_stream_complete(s);
1159
1160 finish:
1161     pa_stream_unref(s);
1162 }
1163
1164 static int create_stream(
1165         pa_stream_direction_t direction,
1166         pa_stream *s,
1167         const char *dev,
1168         const pa_buffer_attr *attr,
1169         pa_stream_flags_t flags,
1170         const pa_cvolume *volume,
1171         pa_stream *sync_stream) {
1172
1173     pa_tagstruct *t;
1174     uint32_t tag;
1175     pa_bool_t volume_set = !!volume;
1176     pa_cvolume cv;
1177     uint32_t i;
1178
1179     pa_assert(s);
1180     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1181     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1182
1183     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1184     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1185     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1186     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1187                                               PA_STREAM_INTERPOLATE_TIMING|
1188                                               PA_STREAM_NOT_MONOTONIC|
1189                                               PA_STREAM_AUTO_TIMING_UPDATE|
1190                                               PA_STREAM_NO_REMAP_CHANNELS|
1191                                               PA_STREAM_NO_REMIX_CHANNELS|
1192                                               PA_STREAM_FIX_FORMAT|
1193                                               PA_STREAM_FIX_RATE|
1194                                               PA_STREAM_FIX_CHANNELS|
1195                                               PA_STREAM_DONT_MOVE|
1196                                               PA_STREAM_VARIABLE_RATE|
1197                                               PA_STREAM_PEAK_DETECT|
1198                                               PA_STREAM_START_MUTED|
1199                                               PA_STREAM_ADJUST_LATENCY|
1200                                               PA_STREAM_EARLY_REQUESTS|
1201                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1202                                               PA_STREAM_START_UNMUTED|
1203                                               PA_STREAM_FAIL_ON_SUSPEND|
1204                                               PA_STREAM_RELATIVE_VOLUME|
1205                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1206
1207
1208     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1209     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1210     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1211     /* Although some of the other flags are not supported on older
1212      * version, we don't check for them here, because it doesn't hurt
1213      * when they are passed but actually not supported. This makes
1214      * client development easier */
1215
1216     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1217     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1218     PA_CHECK_VALIDITY(s->context, !volume || s->n_formats || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1219     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1220     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1221
1222     pa_stream_ref(s);
1223
1224     s->direction = direction;
1225
1226     if (sync_stream)
1227         s->syncid = sync_stream->syncid;
1228
1229     if (attr)
1230         s->buffer_attr = *attr;
1231     patch_buffer_attr(s, &s->buffer_attr, &flags);
1232
1233     s->flags = flags;
1234     s->corked = !!(flags & PA_STREAM_START_CORKED);
1235
1236     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1237         pa_usec_t x;
1238
1239         x = pa_rtclock_now();
1240
1241         pa_assert(!s->smoother);
1242         s->smoother = pa_smoother_new(
1243                 SMOOTHER_ADJUST_TIME,
1244                 SMOOTHER_HISTORY_TIME,
1245                 !(flags & PA_STREAM_NOT_MONOTONIC),
1246                 TRUE,
1247                 SMOOTHER_MIN_HISTORY,
1248                 x,
1249                 TRUE);
1250     }
1251
1252     if (!dev)
1253         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1254
1255     t = pa_tagstruct_command(
1256             s->context,
1257             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1258             &tag);
1259
1260     if (s->context->version < 13)
1261         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1262
1263     pa_tagstruct_put(
1264             t,
1265             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1266             PA_TAG_CHANNEL_MAP, &s->channel_map,
1267             PA_TAG_U32, PA_INVALID_INDEX,
1268             PA_TAG_STRING, dev,
1269             PA_TAG_U32, s->buffer_attr.maxlength,
1270             PA_TAG_BOOLEAN, s->corked,
1271             PA_TAG_INVALID);
1272
1273     if (!volume) {
1274         if (pa_sample_spec_valid(&s->sample_spec))
1275             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1276         else {
1277             /* This is not really relevant, since no volume was set, and
1278              * the real number of channels is embedded in the format_info
1279              * structure */
1280             volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1281         }
1282     }
1283
1284     if (s->direction == PA_STREAM_PLAYBACK) {
1285         pa_tagstruct_put(
1286                 t,
1287                 PA_TAG_U32, s->buffer_attr.tlength,
1288                 PA_TAG_U32, s->buffer_attr.prebuf,
1289                 PA_TAG_U32, s->buffer_attr.minreq,
1290                 PA_TAG_U32, s->syncid,
1291                 PA_TAG_INVALID);
1292
1293         pa_tagstruct_put_cvolume(t, volume);
1294     } else
1295         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1296
1297     if (s->context->version >= 12) {
1298         pa_tagstruct_put(
1299                 t,
1300                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1301                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1302                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1303                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1304                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1305                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1306                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1307                 PA_TAG_INVALID);
1308     }
1309
1310     if (s->context->version >= 13) {
1311
1312         if (s->direction == PA_STREAM_PLAYBACK)
1313             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1314         else
1315             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1316
1317         pa_tagstruct_put(
1318                 t,
1319                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1320                 PA_TAG_PROPLIST, s->proplist,
1321                 PA_TAG_INVALID);
1322
1323         if (s->direction == PA_STREAM_RECORD)
1324             pa_tagstruct_putu32(t, s->direct_on_input);
1325     }
1326
1327     if (s->context->version >= 14) {
1328
1329         if (s->direction == PA_STREAM_PLAYBACK)
1330             pa_tagstruct_put_boolean(t, volume_set);
1331
1332         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1333     }
1334
1335     if (s->context->version >= 15) {
1336
1337         if (s->direction == PA_STREAM_PLAYBACK)
1338             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1339
1340         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1341         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1342     }
1343
1344     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1345         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1346
1347     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1348         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1349
1350     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1351         || s->context->version >= 22) {
1352
1353         pa_tagstruct_putu8(t, s->n_formats);
1354         for (i = 0; i < s->n_formats; i++)
1355             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1356     }
1357
1358     if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1359         pa_tagstruct_put_cvolume(t, volume);
1360         pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1361         pa_tagstruct_put_boolean(t, volume_set);
1362         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1363         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1364         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1365     }
1366
1367     pa_pstream_send_tagstruct(s->context->pstream, t);
1368     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1369
1370     pa_stream_set_state(s, PA_STREAM_CREATING);
1371
1372     pa_stream_unref(s);
1373     return 0;
1374 }
1375
1376 int pa_stream_connect_playback(
1377         pa_stream *s,
1378         const char *dev,
1379         const pa_buffer_attr *attr,
1380         pa_stream_flags_t flags,
1381         const pa_cvolume *volume,
1382         pa_stream *sync_stream) {
1383
1384     pa_assert(s);
1385     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1386
1387     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1388 }
1389
1390 int pa_stream_connect_record(
1391         pa_stream *s,
1392         const char *dev,
1393         const pa_buffer_attr *attr,
1394         pa_stream_flags_t flags) {
1395
1396     pa_assert(s);
1397     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1398
1399     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1400 }
1401
1402 int pa_stream_begin_write(
1403         pa_stream *s,
1404         void **data,
1405         size_t *nbytes) {
1406
1407     pa_assert(s);
1408     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1409
1410     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1411     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1412     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1413     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1414     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1415
1416     if (*nbytes != (size_t) -1) {
1417         size_t m, fs;
1418
1419         m = pa_mempool_block_size_max(s->context->mempool);
1420         fs = pa_frame_size(&s->sample_spec);
1421
1422         m = (m / fs) * fs;
1423         if (*nbytes > m)
1424             *nbytes = m;
1425     }
1426
1427     if (!s->write_memblock) {
1428         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1429         s->write_data = pa_memblock_acquire(s->write_memblock);
1430     }
1431
1432     *data = s->write_data;
1433     *nbytes = pa_memblock_get_length(s->write_memblock);
1434
1435     return 0;
1436 }
1437
1438 int pa_stream_cancel_write(
1439         pa_stream *s) {
1440
1441     pa_assert(s);
1442     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1443
1444     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1445     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1446     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1447     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1448
1449     pa_assert(s->write_data);
1450
1451     pa_memblock_release(s->write_memblock);
1452     pa_memblock_unref(s->write_memblock);
1453     s->write_memblock = NULL;
1454     s->write_data = NULL;
1455
1456     return 0;
1457 }
1458
1459 int pa_stream_write(
1460         pa_stream *s,
1461         const void *data,
1462         size_t length,
1463         pa_free_cb_t free_cb,
1464         int64_t offset,
1465         pa_seek_mode_t seek) {
1466
1467     pa_assert(s);
1468     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1469     pa_assert(data);
1470
1471     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1472     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1473     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1474     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1475     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1476     PA_CHECK_VALIDITY(s->context,
1477                       !s->write_memblock ||
1478                       ((data >= s->write_data) &&
1479                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1480                       PA_ERR_INVALID);
1481     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1482
1483     if (s->write_memblock) {
1484         pa_memchunk chunk;
1485
1486         /* pa_stream_write_begin() was called before */
1487
1488         pa_memblock_release(s->write_memblock);
1489
1490         chunk.memblock = s->write_memblock;
1491         chunk.index = (const char *) data - (const char *) s->write_data;
1492         chunk.length = length;
1493
1494         s->write_memblock = NULL;
1495         s->write_data = NULL;
1496
1497         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1498         pa_memblock_unref(chunk.memblock);
1499
1500     } else {
1501         pa_seek_mode_t t_seek = seek;
1502         int64_t t_offset = offset;
1503         size_t t_length = length;
1504         const void *t_data = data;
1505
1506         /* pa_stream_write_begin() was not called before */
1507
1508         while (t_length > 0) {
1509             pa_memchunk chunk;
1510
1511             chunk.index = 0;
1512
1513             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1514                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1515                 chunk.length = t_length;
1516             } else {
1517                 void *d;
1518
1519                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1520                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1521
1522                 d = pa_memblock_acquire(chunk.memblock);
1523                 memcpy(d, t_data, chunk.length);
1524                 pa_memblock_release(chunk.memblock);
1525             }
1526
1527             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1528
1529             t_offset = 0;
1530             t_seek = PA_SEEK_RELATIVE;
1531
1532             t_data = (const uint8_t*) t_data + chunk.length;
1533             t_length -= chunk.length;
1534
1535             pa_memblock_unref(chunk.memblock);
1536         }
1537
1538         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1539             free_cb((void*) data);
1540     }
1541
1542     /* This is obviously wrong since we ignore the seeking index . But
1543      * that's OK, the server side applies the same error */
1544     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1545
1546     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1547
1548     if (s->direction == PA_STREAM_PLAYBACK) {
1549
1550         /* Update latency request correction */
1551         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1552
1553             if (seek == PA_SEEK_ABSOLUTE) {
1554                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1555                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1556                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1557             } else if (seek == PA_SEEK_RELATIVE) {
1558                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1559                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1560             } else
1561                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1562         }
1563
1564         /* Update the write index in the already available latency data */
1565         if (s->timing_info_valid) {
1566
1567             if (seek == PA_SEEK_ABSOLUTE) {
1568                 s->timing_info.write_index_corrupt = FALSE;
1569                 s->timing_info.write_index = offset + (int64_t) length;
1570             } else if (seek == PA_SEEK_RELATIVE) {
1571                 if (!s->timing_info.write_index_corrupt)
1572                     s->timing_info.write_index += offset + (int64_t) length;
1573             } else
1574                 s->timing_info.write_index_corrupt = TRUE;
1575         }
1576
1577         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1578             request_auto_timing_update(s, TRUE);
1579     }
1580
1581     return 0;
1582 }
1583
1584 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1585     pa_assert(s);
1586     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1587     pa_assert(data);
1588     pa_assert(length);
1589
1590     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1591     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1592     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1593
1594     if (!s->peek_memchunk.memblock) {
1595
1596         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1597             *data = NULL;
1598             *length = 0;
1599             return 0;
1600         }
1601
1602         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1603     }
1604
1605     pa_assert(s->peek_data);
1606     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1607     *length = s->peek_memchunk.length;
1608     return 0;
1609 }
1610
1611 int pa_stream_drop(pa_stream *s) {
1612     pa_assert(s);
1613     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1614
1615     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1616     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1617     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1618     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1619
1620     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1621
1622     /* Fix the simulated local read index */
1623     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1624         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1625
1626     pa_assert(s->peek_data);
1627     pa_memblock_release(s->peek_memchunk.memblock);
1628     pa_memblock_unref(s->peek_memchunk.memblock);
1629     pa_memchunk_reset(&s->peek_memchunk);
1630
1631     return 0;
1632 }
1633
1634 size_t pa_stream_writable_size(pa_stream *s) {
1635     pa_assert(s);
1636     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1637
1638     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1639     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1640     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1641
1642     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1643 }
1644
1645 size_t pa_stream_readable_size(pa_stream *s) {
1646     pa_assert(s);
1647     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1648
1649     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1650     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1651     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1652
1653     return pa_memblockq_get_length(s->record_memblockq);
1654 }
1655
1656 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1657     pa_operation *o;
1658     pa_tagstruct *t;
1659     uint32_t tag;
1660
1661     pa_assert(s);
1662     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1663
1664     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1665     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1666     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1667
1668     /* Ask for a timing update before we cork/uncork to get the best
1669      * accuracy for the transport latency suitable for the
1670      * check_smoother_status() call in the started callback */
1671     request_auto_timing_update(s, TRUE);
1672
1673     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1674
1675     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1676     pa_tagstruct_putu32(t, s->channel);
1677     pa_pstream_send_tagstruct(s->context->pstream, t);
1678     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1679
1680     /* This might cause the read index to continue again, hence
1681      * let's request a timing update */
1682     request_auto_timing_update(s, TRUE);
1683
1684     return o;
1685 }
1686
1687 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1688     pa_usec_t usec;
1689
1690     pa_assert(s);
1691     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1692     pa_assert(s->state == PA_STREAM_READY);
1693     pa_assert(s->direction != PA_STREAM_UPLOAD);
1694     pa_assert(s->timing_info_valid);
1695     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1696     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1697
1698     if (s->direction == PA_STREAM_PLAYBACK) {
1699         /* The last byte that was written into the output device
1700          * had this time value associated */
1701         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1702
1703         if (!s->corked && !s->suspended) {
1704
1705             if (!ignore_transport)
1706                 /* Because the latency info took a little time to come
1707                  * to us, we assume that the real output time is actually
1708                  * a little ahead */
1709                 usec += s->timing_info.transport_usec;
1710
1711             /* However, the output device usually maintains a buffer
1712                too, hence the real sample currently played is a little
1713                back  */
1714             if (s->timing_info.sink_usec >= usec)
1715                 usec = 0;
1716             else
1717                 usec -= s->timing_info.sink_usec;
1718         }
1719
1720     } else {
1721         pa_assert(s->direction == PA_STREAM_RECORD);
1722
1723         /* The last byte written into the server side queue had
1724          * this time value associated */
1725         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1726
1727         if (!s->corked && !s->suspended) {
1728
1729             if (!ignore_transport)
1730                 /* Add transport latency */
1731                 usec += s->timing_info.transport_usec;
1732
1733             /* Add latency of data in device buffer */
1734             usec += s->timing_info.source_usec;
1735
1736             /* If this is a monitor source, we need to correct the
1737              * time by the playback device buffer */
1738             if (s->timing_info.sink_usec >= usec)
1739                 usec = 0;
1740             else
1741                 usec -= s->timing_info.sink_usec;
1742         }
1743     }
1744
1745     return usec;
1746 }
1747
1748 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1749     pa_operation *o = userdata;
1750     struct timeval local, remote, now;
1751     pa_timing_info *i;
1752     pa_bool_t playing = FALSE;
1753     uint64_t underrun_for = 0, playing_for = 0;
1754
1755     pa_assert(pd);
1756     pa_assert(o);
1757     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1758
1759     if (!o->context || !o->stream)
1760         goto finish;
1761
1762     i = &o->stream->timing_info;
1763
1764     o->stream->timing_info_valid = FALSE;
1765     i->write_index_corrupt = TRUE;
1766     i->read_index_corrupt = TRUE;
1767
1768     if (command != PA_COMMAND_REPLY) {
1769         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1770             goto finish;
1771
1772     } else {
1773
1774         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1775             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1776             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1777             pa_tagstruct_get_timeval(t, &local) < 0 ||
1778             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1779             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1780             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1781
1782             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1783             goto finish;
1784         }
1785
1786         if (o->context->version >= 13 &&
1787             o->stream->direction == PA_STREAM_PLAYBACK)
1788             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1789                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1790
1791                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1792                 goto finish;
1793             }
1794
1795
1796         if (!pa_tagstruct_eof(t)) {
1797             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1798             goto finish;
1799         }
1800         o->stream->timing_info_valid = TRUE;
1801         i->write_index_corrupt = FALSE;
1802         i->read_index_corrupt = FALSE;
1803
1804         i->playing = (int) playing;
1805         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1806
1807         pa_gettimeofday(&now);
1808
1809         /* Calculate timestamps */
1810         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1811             /* local and remote seem to have synchronized clocks */
1812
1813             if (o->stream->direction == PA_STREAM_PLAYBACK)
1814                 i->transport_usec = pa_timeval_diff(&remote, &local);
1815             else
1816                 i->transport_usec = pa_timeval_diff(&now, &remote);
1817
1818             i->synchronized_clocks = TRUE;
1819             i->timestamp = remote;
1820         } else {
1821             /* clocks are not synchronized, let's estimate latency then */
1822             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1823             i->synchronized_clocks = FALSE;
1824             i->timestamp = local;
1825             pa_timeval_add(&i->timestamp, i->transport_usec);
1826         }
1827
1828         /* Invalidate read and write indexes if necessary */
1829         if (tag < o->stream->read_index_not_before)
1830             i->read_index_corrupt = TRUE;
1831
1832         if (tag < o->stream->write_index_not_before)
1833             i->write_index_corrupt = TRUE;
1834
1835         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1836             /* Write index correction */
1837
1838             int n, j;
1839             uint32_t ctag = tag;
1840
1841             /* Go through the saved correction values and add up the
1842              * total correction.*/
1843             for (n = 0, j = o->stream->current_write_index_correction+1;
1844                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1845                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1846
1847                 /* Step over invalid data or out-of-date data */
1848                 if (!o->stream->write_index_corrections[j].valid ||
1849                     o->stream->write_index_corrections[j].tag < ctag)
1850                     continue;
1851
1852                 /* Make sure that everything is in order */
1853                 ctag = o->stream->write_index_corrections[j].tag+1;
1854
1855                 /* Now fix the write index */
1856                 if (o->stream->write_index_corrections[j].corrupt) {
1857                     /* A corrupting seek was made */
1858                     i->write_index_corrupt = TRUE;
1859                 } else if (o->stream->write_index_corrections[j].absolute) {
1860                     /* An absolute seek was made */
1861                     i->write_index = o->stream->write_index_corrections[j].value;
1862                     i->write_index_corrupt = FALSE;
1863                 } else if (!i->write_index_corrupt) {
1864                     /* A relative seek was made */
1865                     i->write_index += o->stream->write_index_corrections[j].value;
1866                 }
1867             }
1868
1869             /* Clear old correction entries */
1870             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1871                 if (!o->stream->write_index_corrections[n].valid)
1872                     continue;
1873
1874                 if (o->stream->write_index_corrections[n].tag <= tag)
1875                     o->stream->write_index_corrections[n].valid = FALSE;
1876             }
1877         }
1878
1879         if (o->stream->direction == PA_STREAM_RECORD) {
1880             /* Read index correction */
1881
1882             if (!i->read_index_corrupt)
1883                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1884         }
1885
1886         /* Update smoother if we're not corked */
1887         if (o->stream->smoother && !o->stream->corked) {
1888             pa_usec_t u, x;
1889
1890             u = x = pa_rtclock_now() - i->transport_usec;
1891
1892             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1893                 pa_usec_t su;
1894
1895                 /* If we weren't playing then it will take some time
1896                  * until the audio will actually come out through the
1897                  * speakers. Since we follow that timing here, we need
1898                  * to try to fix this up */
1899
1900                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1901
1902                 if (su < i->sink_usec)
1903                     x += i->sink_usec - su;
1904             }
1905
1906             if (!i->playing)
1907                 pa_smoother_pause(o->stream->smoother, x);
1908
1909             /* Update the smoother */
1910             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1911                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1912                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1913
1914             if (i->playing)
1915                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1916         }
1917     }
1918
1919     o->stream->auto_timing_update_requested = FALSE;
1920
1921     if (o->stream->latency_update_callback)
1922         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1923
1924     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1925         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1926         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1927     }
1928
1929 finish:
1930
1931     pa_operation_done(o);
1932     pa_operation_unref(o);
1933 }
1934
1935 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1936     uint32_t tag;
1937     pa_operation *o;
1938     pa_tagstruct *t;
1939     struct timeval now;
1940     int cidx = 0;
1941
1942     pa_assert(s);
1943     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1944
1945     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1946     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1947     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1948
1949     if (s->direction == PA_STREAM_PLAYBACK) {
1950         /* Find a place to store the write_index correction data for this entry */
1951         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1952
1953         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1954         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1955     }
1956     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1957
1958     t = pa_tagstruct_command(
1959             s->context,
1960             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1961             &tag);
1962     pa_tagstruct_putu32(t, s->channel);
1963     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1964
1965     pa_pstream_send_tagstruct(s->context->pstream, t);
1966     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1967
1968     if (s->direction == PA_STREAM_PLAYBACK) {
1969         /* Fill in initial correction data */
1970
1971         s->current_write_index_correction = cidx;
1972
1973         s->write_index_corrections[cidx].valid = TRUE;
1974         s->write_index_corrections[cidx].absolute = FALSE;
1975         s->write_index_corrections[cidx].corrupt = FALSE;
1976         s->write_index_corrections[cidx].tag = tag;
1977         s->write_index_corrections[cidx].value = 0;
1978     }
1979
1980     return o;
1981 }
1982
1983 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1984     pa_stream *s = userdata;
1985
1986     pa_assert(pd);
1987     pa_assert(s);
1988     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1989
1990     pa_stream_ref(s);
1991
1992     if (command != PA_COMMAND_REPLY) {
1993         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1994             goto finish;
1995
1996         pa_stream_set_state(s, PA_STREAM_FAILED);
1997         goto finish;
1998     } else if (!pa_tagstruct_eof(t)) {
1999         pa_context_fail(s->context, PA_ERR_PROTOCOL);
2000         goto finish;
2001     }
2002
2003     pa_stream_set_state(s, PA_STREAM_TERMINATED);
2004
2005 finish:
2006     pa_stream_unref(s);
2007 }
2008
2009 int pa_stream_disconnect(pa_stream *s) {
2010     pa_tagstruct *t;
2011     uint32_t tag;
2012
2013     pa_assert(s);
2014     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2015
2016     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2017     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2018     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2019
2020     pa_stream_ref(s);
2021
2022     t = pa_tagstruct_command(
2023             s->context,
2024             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2025                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2026             &tag);
2027     pa_tagstruct_putu32(t, s->channel);
2028     pa_pstream_send_tagstruct(s->context->pstream, t);
2029     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2030
2031     pa_stream_unref(s);
2032     return 0;
2033 }
2034
2035 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2036     pa_assert(s);
2037     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2038
2039     if (pa_detect_fork())
2040         return;
2041
2042     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2043         return;
2044
2045     s->read_callback = cb;
2046     s->read_userdata = userdata;
2047 }
2048
2049 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2050     pa_assert(s);
2051     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2052
2053     if (pa_detect_fork())
2054         return;
2055
2056     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2057         return;
2058
2059     s->write_callback = cb;
2060     s->write_userdata = userdata;
2061 }
2062
2063 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2064     pa_assert(s);
2065     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2066
2067     if (pa_detect_fork())
2068         return;
2069
2070     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2071         return;
2072
2073     s->state_callback = cb;
2074     s->state_userdata = userdata;
2075 }
2076
2077 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2078     pa_assert(s);
2079     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2080
2081     if (pa_detect_fork())
2082         return;
2083
2084     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2085         return;
2086
2087     s->overflow_callback = cb;
2088     s->overflow_userdata = userdata;
2089 }
2090
2091 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2092     pa_assert(s);
2093     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2094
2095     if (pa_detect_fork())
2096         return;
2097
2098     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2099         return;
2100
2101     s->underflow_callback = cb;
2102     s->underflow_userdata = userdata;
2103 }
2104
2105 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2106     pa_assert(s);
2107     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2108
2109     if (pa_detect_fork())
2110         return;
2111
2112     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2113         return;
2114
2115     s->latency_update_callback = cb;
2116     s->latency_update_userdata = userdata;
2117 }
2118
2119 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2120     pa_assert(s);
2121     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2122
2123     if (pa_detect_fork())
2124         return;
2125
2126     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2127         return;
2128
2129     s->moved_callback = cb;
2130     s->moved_userdata = userdata;
2131 }
2132
2133 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2134     pa_assert(s);
2135     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2136
2137     if (pa_detect_fork())
2138         return;
2139
2140     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2141         return;
2142
2143     s->suspended_callback = cb;
2144     s->suspended_userdata = userdata;
2145 }
2146
2147 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2148     pa_assert(s);
2149     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2150
2151     if (pa_detect_fork())
2152         return;
2153
2154     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2155         return;
2156
2157     s->started_callback = cb;
2158     s->started_userdata = userdata;
2159 }
2160
2161 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2162     pa_assert(s);
2163     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2164
2165     if (pa_detect_fork())
2166         return;
2167
2168     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2169         return;
2170
2171     s->event_callback = cb;
2172     s->event_userdata = userdata;
2173 }
2174
2175 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2176     pa_assert(s);
2177     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2178
2179     if (pa_detect_fork())
2180         return;
2181
2182     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2183         return;
2184
2185     s->buffer_attr_callback = cb;
2186     s->buffer_attr_userdata = userdata;
2187 }
2188
2189 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2190     pa_operation *o = userdata;
2191     int success = 1;
2192
2193     pa_assert(pd);
2194     pa_assert(o);
2195     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2196
2197     if (!o->context)
2198         goto finish;
2199
2200     if (command != PA_COMMAND_REPLY) {
2201         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2202             goto finish;
2203
2204         success = 0;
2205     } else if (!pa_tagstruct_eof(t)) {
2206         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2207         goto finish;
2208     }
2209
2210     if (o->callback) {
2211         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2212         cb(o->stream, success, o->userdata);
2213     }
2214
2215 finish:
2216     pa_operation_done(o);
2217     pa_operation_unref(o);
2218 }
2219
2220 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2221     pa_operation *o;
2222     pa_tagstruct *t;
2223     uint32_t tag;
2224
2225     pa_assert(s);
2226     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2227
2228     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2229     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2230     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2231
2232     /* Ask for a timing update before we cork/uncork to get the best
2233      * accuracy for the transport latency suitable for the
2234      * check_smoother_status() call in the started callback */
2235     request_auto_timing_update(s, TRUE);
2236
2237     s->corked = b;
2238
2239     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2240
2241     t = pa_tagstruct_command(
2242             s->context,
2243             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2244             &tag);
2245     pa_tagstruct_putu32(t, s->channel);
2246     pa_tagstruct_put_boolean(t, !!b);
2247     pa_pstream_send_tagstruct(s->context->pstream, t);
2248     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2249
2250     check_smoother_status(s, FALSE, FALSE, FALSE);
2251
2252     /* This might cause the indexes to hang/start again, hence let's
2253      * request a timing update, after the cork/uncork, too */
2254     request_auto_timing_update(s, TRUE);
2255
2256     return o;
2257 }
2258
2259 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2260     pa_tagstruct *t;
2261     pa_operation *o;
2262     uint32_t tag;
2263
2264     pa_assert(s);
2265     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2266
2267     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2268     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2269
2270     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2271
2272     t = pa_tagstruct_command(s->context, command, &tag);
2273     pa_tagstruct_putu32(t, s->channel);
2274     pa_pstream_send_tagstruct(s->context->pstream, t);
2275     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2276
2277     return o;
2278 }
2279
2280 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2281     pa_operation *o;
2282
2283     pa_assert(s);
2284     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2285
2286     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2287     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2288     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2289
2290     /* Ask for a timing update *before* the flush, so that the
2291      * transport usec is as up to date as possible when we get the
2292      * underflow message and update the smoother status*/
2293     request_auto_timing_update(s, TRUE);
2294
2295     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2296         return NULL;
2297
2298     if (s->direction == PA_STREAM_PLAYBACK) {
2299
2300         if (s->write_index_corrections[s->current_write_index_correction].valid)
2301             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2302
2303         if (s->buffer_attr.prebuf > 0)
2304             check_smoother_status(s, FALSE, FALSE, TRUE);
2305
2306         /* This will change the write index, but leave the
2307          * read index untouched. */
2308         invalidate_indexes(s, FALSE, TRUE);
2309
2310     } else
2311         /* For record streams this has no influence on the write
2312          * index, but the read index might jump. */
2313         invalidate_indexes(s, TRUE, FALSE);
2314
2315     /* Note that we do not update requested_bytes here. This is
2316      * because we cannot really know how data actually was dropped
2317      * from the write index due to this. This 'error' will be applied
2318      * by both client and server and hence we should be fine. */
2319
2320     return o;
2321 }
2322
2323 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2324     pa_operation *o;
2325
2326     pa_assert(s);
2327     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2328
2329     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2330     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2331     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2332     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2333
2334     /* Ask for a timing update before we cork/uncork to get the best
2335      * accuracy for the transport latency suitable for the
2336      * check_smoother_status() call in the started callback */
2337     request_auto_timing_update(s, TRUE);
2338
2339     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2340         return NULL;
2341
2342     /* This might cause the read index to hang again, hence
2343      * let's request a timing update */
2344     request_auto_timing_update(s, TRUE);
2345
2346     return o;
2347 }
2348
2349 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2350     pa_operation *o;
2351
2352     pa_assert(s);
2353     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2354
2355     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2356     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2357     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2358     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2359
2360     /* Ask for a timing update before we cork/uncork to get the best
2361      * accuracy for the transport latency suitable for the
2362      * check_smoother_status() call in the started callback */
2363     request_auto_timing_update(s, TRUE);
2364
2365     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2366         return NULL;
2367
2368     /* This might cause the read index to start moving again, hence
2369      * let's request a timing update */
2370     request_auto_timing_update(s, TRUE);
2371
2372     return o;
2373 }
2374
2375 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2376     pa_operation *o;
2377
2378     pa_assert(s);
2379     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2380     pa_assert(name);
2381
2382     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2383     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2384     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2385
2386     if (s->context->version >= 13) {
2387         pa_proplist *p = pa_proplist_new();
2388
2389         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2390         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2391         pa_proplist_free(p);
2392     } else {
2393         pa_tagstruct *t;
2394         uint32_t tag;
2395
2396         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2397         t = pa_tagstruct_command(
2398                 s->context,
2399                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2400                 &tag);
2401         pa_tagstruct_putu32(t, s->channel);
2402         pa_tagstruct_puts(t, name);
2403         pa_pstream_send_tagstruct(s->context->pstream, t);
2404         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2405     }
2406
2407     return o;
2408 }
2409
2410 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2411     pa_usec_t usec;
2412
2413     pa_assert(s);
2414     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2415
2416     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2417     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2418     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2419     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2420     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2421     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2422
2423     if (s->smoother)
2424         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2425     else
2426         usec = calc_time(s, FALSE);
2427
2428     /* Make sure the time runs monotonically */
2429     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2430         if (usec < s->previous_time)
2431             usec = s->previous_time;
2432         else
2433             s->previous_time = usec;
2434     }
2435
2436     if (r_usec)
2437         *r_usec = usec;
2438
2439     return 0;
2440 }
2441
2442 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2443     pa_assert(s);
2444     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2445
2446     if (negative)
2447         *negative = 0;
2448
2449     if (a >= b)
2450         return a-b;
2451     else {
2452         if (negative && s->direction == PA_STREAM_RECORD) {
2453             *negative = 1;
2454             return b-a;
2455         } else
2456             return 0;
2457     }
2458 }
2459
2460 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2461     pa_usec_t t, c;
2462     int r;
2463     int64_t cindex;
2464
2465     pa_assert(s);
2466     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2467     pa_assert(r_usec);
2468
2469     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2470     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2471     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2472     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2473     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2474     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2475
2476     if ((r = pa_stream_get_time(s, &t)) < 0)
2477         return r;
2478
2479     if (s->direction == PA_STREAM_PLAYBACK)
2480         cindex = s->timing_info.write_index;
2481     else
2482         cindex = s->timing_info.read_index;
2483
2484     if (cindex < 0)
2485         cindex = 0;
2486
2487     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2488
2489     if (s->direction == PA_STREAM_PLAYBACK)
2490         *r_usec = time_counter_diff(s, c, t, negative);
2491     else
2492         *r_usec = time_counter_diff(s, t, c, negative);
2493
2494     return 0;
2495 }
2496
2497 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2498     pa_assert(s);
2499     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2500
2501     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2502     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2503     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2504     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2505
2506     return &s->timing_info;
2507 }
2508
2509 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2510     pa_assert(s);
2511     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2512
2513     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2514
2515     return &s->sample_spec;
2516 }
2517
2518 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2519     pa_assert(s);
2520     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2521
2522     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2523
2524     return &s->channel_map;
2525 }
2526
2527 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2528     pa_assert(s);
2529     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2530
2531     /* We don't have the format till routing is done */
2532     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2533     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2534
2535     return s->format;
2536 }
2537 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2538     pa_assert(s);
2539     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2540
2541     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2542     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2543     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2544
2545     return &s->buffer_attr;
2546 }
2547
2548 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2549     pa_operation *o = userdata;
2550     int success = 1;
2551
2552     pa_assert(pd);
2553     pa_assert(o);
2554     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2555
2556     if (!o->context)
2557         goto finish;
2558
2559     if (command != PA_COMMAND_REPLY) {
2560         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2561             goto finish;
2562
2563         success = 0;
2564     } else {
2565         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2566             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2567                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2568                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2569                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2570                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2571                 goto finish;
2572             }
2573         } else if (o->stream->direction == PA_STREAM_RECORD) {
2574             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2575                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2576                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2577                 goto finish;
2578             }
2579         }
2580
2581         if (o->stream->context->version >= 13) {
2582             pa_usec_t usec;
2583
2584             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2585                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2586                 goto finish;
2587             }
2588
2589             if (o->stream->direction == PA_STREAM_RECORD)
2590                 o->stream->timing_info.configured_source_usec = usec;
2591             else
2592                 o->stream->timing_info.configured_sink_usec = usec;
2593         }
2594
2595         if (!pa_tagstruct_eof(t)) {
2596             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2597             goto finish;
2598         }
2599     }
2600
2601     if (o->callback) {
2602         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2603         cb(o->stream, success, o->userdata);
2604     }
2605
2606 finish:
2607     pa_operation_done(o);
2608     pa_operation_unref(o);
2609 }
2610
2611
2612 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2613     pa_operation *o;
2614     pa_tagstruct *t;
2615     uint32_t tag;
2616     pa_buffer_attr copy;
2617
2618     pa_assert(s);
2619     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2620     pa_assert(attr);
2621
2622     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2623     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2624     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2625     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2626
2627     /* Ask for a timing update before we cork/uncork to get the best
2628      * accuracy for the transport latency suitable for the
2629      * check_smoother_status() call in the started callback */
2630     request_auto_timing_update(s, TRUE);
2631
2632     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2633
2634     t = pa_tagstruct_command(
2635             s->context,
2636             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2637             &tag);
2638     pa_tagstruct_putu32(t, s->channel);
2639
2640     copy = *attr;
2641     patch_buffer_attr(s, &copy, NULL);
2642     attr = &copy;
2643
2644     pa_tagstruct_putu32(t, attr->maxlength);
2645
2646     if (s->direction == PA_STREAM_PLAYBACK)
2647         pa_tagstruct_put(
2648                 t,
2649                 PA_TAG_U32, attr->tlength,
2650                 PA_TAG_U32, attr->prebuf,
2651                 PA_TAG_U32, attr->minreq,
2652                 PA_TAG_INVALID);
2653     else
2654         pa_tagstruct_putu32(t, attr->fragsize);
2655
2656     if (s->context->version >= 13)
2657         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2658
2659     if (s->context->version >= 14)
2660         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2661
2662     pa_pstream_send_tagstruct(s->context->pstream, t);
2663     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2664
2665     /* This might cause changes in the read/write index, hence let's
2666      * request a timing update */
2667     request_auto_timing_update(s, TRUE);
2668
2669     return o;
2670 }
2671
2672 uint32_t pa_stream_get_device_index(pa_stream *s) {
2673     pa_assert(s);
2674     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2675
2676     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2677     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2678     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2679     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2680     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2681
2682     return s->device_index;
2683 }
2684
2685 const char *pa_stream_get_device_name(pa_stream *s) {
2686     pa_assert(s);
2687     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2688
2689     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2690     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2691     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2692     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2693     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2694
2695     return s->device_name;
2696 }
2697
2698 int pa_stream_is_suspended(pa_stream *s) {
2699     pa_assert(s);
2700     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2701
2702     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2703     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2704     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2705     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2706
2707     return s->suspended;
2708 }
2709
2710 int pa_stream_is_corked(pa_stream *s) {
2711     pa_assert(s);
2712     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2713
2714     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2715     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2716     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2717
2718     return s->corked;
2719 }
2720
2721 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2722     pa_operation *o = userdata;
2723     int success = 1;
2724
2725     pa_assert(pd);
2726     pa_assert(o);
2727     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2728
2729     if (!o->context)
2730         goto finish;
2731
2732     if (command != PA_COMMAND_REPLY) {
2733         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2734             goto finish;
2735
2736         success = 0;
2737     } else {
2738
2739         if (!pa_tagstruct_eof(t)) {
2740             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2741             goto finish;
2742         }
2743     }
2744
2745     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2746     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2747
2748     if (o->callback) {
2749         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2750         cb(o->stream, success, o->userdata);
2751     }
2752
2753 finish:
2754     pa_operation_done(o);
2755     pa_operation_unref(o);
2756 }
2757
2758
2759 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2760     pa_operation *o;
2761     pa_tagstruct *t;
2762     uint32_t tag;
2763
2764     pa_assert(s);
2765     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2766
2767     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2768     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2769     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2770     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2771     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2772     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2773
2774     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2775     o->private = PA_UINT_TO_PTR(rate);
2776
2777     t = pa_tagstruct_command(
2778             s->context,
2779             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2780             &tag);
2781     pa_tagstruct_putu32(t, s->channel);
2782     pa_tagstruct_putu32(t, rate);
2783
2784     pa_pstream_send_tagstruct(s->context->pstream, t);
2785     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2786
2787     return o;
2788 }
2789
2790 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2791     pa_operation *o;
2792     pa_tagstruct *t;
2793     uint32_t tag;
2794
2795     pa_assert(s);
2796     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2797
2798     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2799     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2800     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2801     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2802     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2803
2804     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2805
2806     t = pa_tagstruct_command(
2807             s->context,
2808             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2809             &tag);
2810     pa_tagstruct_putu32(t, s->channel);
2811     pa_tagstruct_putu32(t, (uint32_t) mode);
2812     pa_tagstruct_put_proplist(t, p);
2813
2814     pa_pstream_send_tagstruct(s->context->pstream, t);
2815     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2816
2817     /* Please note that we don't update s->proplist here, because we
2818      * don't export that field */
2819
2820     return o;
2821 }
2822
2823 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2824     pa_operation *o;
2825     pa_tagstruct *t;
2826     uint32_t tag;
2827     const char * const*k;
2828
2829     pa_assert(s);
2830     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2831
2832     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2833     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2834     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2835     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2836     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2837
2838     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2839
2840     t = pa_tagstruct_command(
2841             s->context,
2842             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2843             &tag);
2844     pa_tagstruct_putu32(t, s->channel);
2845
2846     for (k = keys; *k; k++)
2847         pa_tagstruct_puts(t, *k);
2848
2849     pa_tagstruct_puts(t, NULL);
2850
2851     pa_pstream_send_tagstruct(s->context->pstream, t);
2852     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2853
2854     /* Please note that we don't update s->proplist here, because we
2855      * don't export that field */
2856
2857     return o;
2858 }
2859
2860 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2861     pa_assert(s);
2862     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2863
2864     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2865     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2866     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2867     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2868
2869     s->direct_on_input = sink_input_idx;
2870
2871     return 0;
2872 }
2873
2874 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2875     pa_assert(s);
2876     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2877
2878     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2879     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2880     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2881
2882     return s->direct_on_input;
2883 }