protocol-native: Stop auto timing updates if connected to suspended sink or source
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_format_info * const *formats,
89         pa_proplist *p) {
90
91     pa_stream *s;
92     int i;
93
94     pa_assert(c);
95     pa_assert(PA_REFCNT_VALUE(c) >= 1);
96     pa_assert((ss == NULL && map == NULL) || formats == NULL);
97
98     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
100
101     s = pa_xnew(pa_stream, 1);
102     PA_REFCNT_INIT(s);
103     s->context = c;
104     s->mainloop = c->mainloop;
105
106     s->direction = PA_STREAM_NODIRECTION;
107     s->state = PA_STREAM_UNCONNECTED;
108     s->flags = 0;
109
110     if (ss)
111         s->sample_spec = *ss;
112     else
113         s->sample_spec.format = PA_SAMPLE_INVALID;
114
115     if (map)
116         s->channel_map = *map;
117     else
118         pa_channel_map_init(&s->channel_map);
119
120     s->n_formats = 0;
121     if (formats) {
122         for (i = 0; formats[i] && i < PA_MAX_FORMATS; i++) {
123             s->n_formats++;
124             s->req_formats[i] = pa_format_info_copy(formats[i]);
125         }
126         /* Make sure the input array was NULL-terminated */
127         pa_assert(formats[i] == NULL);
128     }
129
130     /* We'll get the final negotiated format after connecting */
131     s->format = NULL;
132
133     s->direct_on_input = PA_INVALID_INDEX;
134
135     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136     if (name)
137         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
138
139     s->channel = 0;
140     s->channel_valid = FALSE;
141     s->syncid = c->csyncid++;
142     s->stream_index = PA_INVALID_INDEX;
143
144     s->requested_bytes = 0;
145     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146
147     /* We initialize der target length here, so that if the user
148      * passes no explicit buffering metrics the default is similar to
149      * what older PA versions provided. */
150
151     s->buffer_attr.maxlength = (uint32_t) -1;
152     if (ss)
153         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154     else {
155         /* FIXME: We assume a worst-case compressed format corresponding to
156          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
157         pa_sample_spec tmp_ss = {
158             .format   = PA_SAMPLE_S16NE,
159             .rate     = 48000,
160             .channels = 2,
161         };
162         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163     }
164     s->buffer_attr.minreq = (uint32_t) -1;
165     s->buffer_attr.prebuf = (uint32_t) -1;
166     s->buffer_attr.fragsize = (uint32_t) -1;
167
168     s->device_index = PA_INVALID_INDEX;
169     s->device_name = NULL;
170     s->suspended = FALSE;
171     s->corked = FALSE;
172
173     s->write_memblock = NULL;
174     s->write_data = NULL;
175
176     pa_memchunk_reset(&s->peek_memchunk);
177     s->peek_data = NULL;
178     s->record_memblockq = NULL;
179
180     memset(&s->timing_info, 0, sizeof(s->timing_info));
181     s->timing_info_valid = FALSE;
182
183     s->previous_time = 0;
184
185     s->read_index_not_before = 0;
186     s->write_index_not_before = 0;
187     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188         s->write_index_corrections[i].valid = 0;
189     s->current_write_index_correction = 0;
190
191     s->auto_timing_update_event = NULL;
192     s->auto_timing_update_requested = FALSE;
193     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195     reset_callbacks(s);
196
197     s->smoother = NULL;
198
199     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200     PA_LLIST_PREPEND(pa_stream, c->streams, s);
201     pa_stream_ref(s);
202
203     return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207         pa_context *c,
208         const char *name,
209         const pa_sample_spec *ss,
210         const pa_channel_map *map,
211         pa_proplist *p) {
212
213     pa_channel_map tmap;
214
215     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221     if (!map)
222         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228         pa_context *c,
229         const char *name,
230         pa_format_info * const *formats,
231         pa_proplist *p) {
232
233     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
234
235     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, p);
236 }
237
238 static void stream_unlink(pa_stream *s) {
239     pa_operation *o, *n;
240     pa_assert(s);
241
242     if (!s->context)
243         return;
244
245     /* Detach from context */
246
247     /* Unref all operatio object that point to us */
248     for (o = s->context->operations; o; o = n) {
249         n = o->next;
250
251         if (o->stream == s)
252             pa_operation_cancel(o);
253     }
254
255     /* Drop all outstanding replies for this stream */
256     if (s->context->pdispatch)
257         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
258
259     if (s->channel_valid) {
260         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
261         s->channel = 0;
262         s->channel_valid = FALSE;
263     }
264
265     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
266     pa_stream_unref(s);
267
268     s->context = NULL;
269
270     if (s->auto_timing_update_event) {
271         pa_assert(s->mainloop);
272         s->mainloop->time_free(s->auto_timing_update_event);
273     }
274
275     reset_callbacks(s);
276 }
277
278 static void stream_free(pa_stream *s) {
279     unsigned int i;
280
281     pa_assert(s);
282
283     stream_unlink(s);
284
285     if (s->write_memblock) {
286         pa_memblock_release(s->write_memblock);
287         pa_memblock_unref(s->write_data);
288     }
289
290     if (s->peek_memchunk.memblock) {
291         if (s->peek_data)
292             pa_memblock_release(s->peek_memchunk.memblock);
293         pa_memblock_unref(s->peek_memchunk.memblock);
294     }
295
296     if (s->record_memblockq)
297         pa_memblockq_free(s->record_memblockq);
298
299     if (s->proplist)
300         pa_proplist_free(s->proplist);
301
302     if (s->smoother)
303         pa_smoother_free(s->smoother);
304
305     for (i = 0; i < s->n_formats; i++)
306         pa_xfree(s->req_formats[i]);
307
308     pa_xfree(s->device_name);
309     pa_xfree(s);
310 }
311
312 void pa_stream_unref(pa_stream *s) {
313     pa_assert(s);
314     pa_assert(PA_REFCNT_VALUE(s) >= 1);
315
316     if (PA_REFCNT_DEC(s) <= 0)
317         stream_free(s);
318 }
319
320 pa_stream* pa_stream_ref(pa_stream *s) {
321     pa_assert(s);
322     pa_assert(PA_REFCNT_VALUE(s) >= 1);
323
324     PA_REFCNT_INC(s);
325     return s;
326 }
327
328 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
329     pa_assert(s);
330     pa_assert(PA_REFCNT_VALUE(s) >= 1);
331
332     return s->state;
333 }
334
335 pa_context* pa_stream_get_context(pa_stream *s) {
336     pa_assert(s);
337     pa_assert(PA_REFCNT_VALUE(s) >= 1);
338
339     return s->context;
340 }
341
342 uint32_t pa_stream_get_index(pa_stream *s) {
343     pa_assert(s);
344     pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
347     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
348
349     return s->stream_index;
350 }
351
352 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
353     pa_assert(s);
354     pa_assert(PA_REFCNT_VALUE(s) >= 1);
355
356     if (s->state == st)
357         return;
358
359     pa_stream_ref(s);
360
361     s->state = st;
362
363     if (s->state_callback)
364         s->state_callback(s, s->state_userdata);
365
366     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
367         stream_unlink(s);
368
369     pa_stream_unref(s);
370 }
371
372 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
373     pa_assert(s);
374     pa_assert(PA_REFCNT_VALUE(s) >= 1);
375
376     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
377         return;
378
379     if (s->state == PA_STREAM_READY &&
380         (force || !s->auto_timing_update_requested)) {
381         pa_operation *o;
382
383 /*         pa_log("Automatically requesting new timing data"); */
384
385         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
386             pa_operation_unref(o);
387             s->auto_timing_update_requested = TRUE;
388         }
389     }
390
391     if (s->auto_timing_update_event) {
392         if (s->suspended && !force) {
393             pa_assert(s->mainloop);
394             s->mainloop->time_free(s->auto_timing_update_event);
395             s->auto_timing_update_event = NULL;
396         } else {
397             if (force)
398                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
399
400             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
401
402             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
403         }
404     }
405 }
406
407 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
408     pa_context *c = userdata;
409     pa_stream *s;
410     uint32_t channel;
411
412     pa_assert(pd);
413     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
414     pa_assert(t);
415     pa_assert(c);
416     pa_assert(PA_REFCNT_VALUE(c) >= 1);
417
418     pa_context_ref(c);
419
420     if (pa_tagstruct_getu32(t, &channel) < 0 ||
421         !pa_tagstruct_eof(t)) {
422         pa_context_fail(c, PA_ERR_PROTOCOL);
423         goto finish;
424     }
425
426     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
427         goto finish;
428
429     if (s->state != PA_STREAM_READY)
430         goto finish;
431
432     pa_context_set_error(c, PA_ERR_KILLED);
433     pa_stream_set_state(s, PA_STREAM_FAILED);
434
435 finish:
436     pa_context_unref(c);
437 }
438
439 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
440     pa_usec_t x;
441
442     pa_assert(s);
443     pa_assert(!force_start || !force_stop);
444
445     if (!s->smoother)
446         return;
447
448     x = pa_rtclock_now();
449
450     if (s->timing_info_valid) {
451         if (aposteriori)
452             x -= s->timing_info.transport_usec;
453         else
454             x += s->timing_info.transport_usec;
455     }
456
457     if (s->suspended || s->corked || force_stop)
458         pa_smoother_pause(s->smoother, x);
459     else if (force_start || s->buffer_attr.prebuf == 0) {
460
461         if (!s->timing_info_valid &&
462             !aposteriori &&
463             !force_start &&
464             !force_stop &&
465             s->context->version >= 13) {
466
467             /* If the server supports STARTED events we take them as
468              * indications when audio really starts/stops playing, if
469              * we don't have any timing info yet -- instead of trying
470              * to be smart and guessing the server time. Otherwise the
471              * unknown transport delay add too much noise to our time
472              * calculations. */
473
474             return;
475         }
476
477         pa_smoother_resume(s->smoother, x, TRUE);
478     }
479
480     /* Please note that we have no idea if playback actually started
481      * if prebuf is non-zero! */
482 }
483
484 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
485
486 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
487     pa_context *c = userdata;
488     pa_stream *s;
489     uint32_t channel;
490     const char *dn;
491     pa_bool_t suspended;
492     uint32_t di;
493     pa_usec_t usec = 0;
494     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
495
496     pa_assert(pd);
497     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
498     pa_assert(t);
499     pa_assert(c);
500     pa_assert(PA_REFCNT_VALUE(c) >= 1);
501
502     pa_context_ref(c);
503
504     if (c->version < 12) {
505         pa_context_fail(c, PA_ERR_PROTOCOL);
506         goto finish;
507     }
508
509     if (pa_tagstruct_getu32(t, &channel) < 0 ||
510         pa_tagstruct_getu32(t, &di) < 0 ||
511         pa_tagstruct_gets(t, &dn) < 0 ||
512         pa_tagstruct_get_boolean(t, &suspended) < 0) {
513         pa_context_fail(c, PA_ERR_PROTOCOL);
514         goto finish;
515     }
516
517     if (c->version >= 13) {
518
519         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
520             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
521                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
522                 pa_tagstruct_get_usec(t, &usec) < 0) {
523                 pa_context_fail(c, PA_ERR_PROTOCOL);
524                 goto finish;
525             }
526         } else {
527             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
528                 pa_tagstruct_getu32(t, &tlength) < 0 ||
529                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
530                 pa_tagstruct_getu32(t, &minreq) < 0 ||
531                 pa_tagstruct_get_usec(t, &usec) < 0) {
532                 pa_context_fail(c, PA_ERR_PROTOCOL);
533                 goto finish;
534             }
535         }
536     }
537
538     if (!pa_tagstruct_eof(t)) {
539         pa_context_fail(c, PA_ERR_PROTOCOL);
540         goto finish;
541     }
542
543     if (!dn || di == PA_INVALID_INDEX) {
544         pa_context_fail(c, PA_ERR_PROTOCOL);
545         goto finish;
546     }
547
548     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
549         goto finish;
550
551     if (s->state != PA_STREAM_READY)
552         goto finish;
553
554     if (c->version >= 13) {
555         if (s->direction == PA_STREAM_RECORD)
556             s->timing_info.configured_source_usec = usec;
557         else
558             s->timing_info.configured_sink_usec = usec;
559
560         s->buffer_attr.maxlength = maxlength;
561         s->buffer_attr.fragsize = fragsize;
562         s->buffer_attr.tlength = tlength;
563         s->buffer_attr.prebuf = prebuf;
564         s->buffer_attr.minreq = minreq;
565     }
566
567     pa_xfree(s->device_name);
568     s->device_name = pa_xstrdup(dn);
569     s->device_index = di;
570
571     s->suspended = suspended;
572
573     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
574         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
575         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
576         request_auto_timing_update(s, TRUE);
577     }
578
579     check_smoother_status(s, TRUE, FALSE, FALSE);
580     request_auto_timing_update(s, TRUE);
581
582     if (s->moved_callback)
583         s->moved_callback(s, s->moved_userdata);
584
585 finish:
586     pa_context_unref(c);
587 }
588
589 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
590     pa_context *c = userdata;
591     pa_stream *s;
592     uint32_t channel;
593     pa_usec_t usec = 0;
594     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
595
596     pa_assert(pd);
597     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
598     pa_assert(t);
599     pa_assert(c);
600     pa_assert(PA_REFCNT_VALUE(c) >= 1);
601
602     pa_context_ref(c);
603
604     if (c->version < 15) {
605         pa_context_fail(c, PA_ERR_PROTOCOL);
606         goto finish;
607     }
608
609     if (pa_tagstruct_getu32(t, &channel) < 0) {
610         pa_context_fail(c, PA_ERR_PROTOCOL);
611         goto finish;
612     }
613
614     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
615         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
616             pa_tagstruct_getu32(t, &fragsize) < 0 ||
617             pa_tagstruct_get_usec(t, &usec) < 0) {
618             pa_context_fail(c, PA_ERR_PROTOCOL);
619             goto finish;
620         }
621     } else {
622         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
623             pa_tagstruct_getu32(t, &tlength) < 0 ||
624             pa_tagstruct_getu32(t, &prebuf) < 0 ||
625             pa_tagstruct_getu32(t, &minreq) < 0 ||
626             pa_tagstruct_get_usec(t, &usec) < 0) {
627             pa_context_fail(c, PA_ERR_PROTOCOL);
628             goto finish;
629         }
630     }
631
632     if (!pa_tagstruct_eof(t)) {
633         pa_context_fail(c, PA_ERR_PROTOCOL);
634         goto finish;
635     }
636
637     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
638         goto finish;
639
640     if (s->state != PA_STREAM_READY)
641         goto finish;
642
643     if (s->direction == PA_STREAM_RECORD)
644         s->timing_info.configured_source_usec = usec;
645     else
646         s->timing_info.configured_sink_usec = usec;
647
648     s->buffer_attr.maxlength = maxlength;
649     s->buffer_attr.fragsize = fragsize;
650     s->buffer_attr.tlength = tlength;
651     s->buffer_attr.prebuf = prebuf;
652     s->buffer_attr.minreq = minreq;
653
654     request_auto_timing_update(s, TRUE);
655
656     if (s->buffer_attr_callback)
657         s->buffer_attr_callback(s, s->buffer_attr_userdata);
658
659 finish:
660     pa_context_unref(c);
661 }
662
663 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
664     pa_context *c = userdata;
665     pa_stream *s;
666     uint32_t channel;
667     pa_bool_t suspended;
668
669     pa_assert(pd);
670     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
671     pa_assert(t);
672     pa_assert(c);
673     pa_assert(PA_REFCNT_VALUE(c) >= 1);
674
675     pa_context_ref(c);
676
677     if (c->version < 12) {
678         pa_context_fail(c, PA_ERR_PROTOCOL);
679         goto finish;
680     }
681
682     if (pa_tagstruct_getu32(t, &channel) < 0 ||
683         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
684         !pa_tagstruct_eof(t)) {
685         pa_context_fail(c, PA_ERR_PROTOCOL);
686         goto finish;
687     }
688
689     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
690         goto finish;
691
692     if (s->state != PA_STREAM_READY)
693         goto finish;
694
695     s->suspended = suspended;
696
697     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
698         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
699         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
700         request_auto_timing_update(s, TRUE);
701     }
702
703     check_smoother_status(s, TRUE, FALSE, FALSE);
704     request_auto_timing_update(s, TRUE);
705
706     if (s->suspended_callback)
707         s->suspended_callback(s, s->suspended_userdata);
708
709 finish:
710     pa_context_unref(c);
711 }
712
713 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
714     pa_context *c = userdata;
715     pa_stream *s;
716     uint32_t channel;
717
718     pa_assert(pd);
719     pa_assert(command == PA_COMMAND_STARTED);
720     pa_assert(t);
721     pa_assert(c);
722     pa_assert(PA_REFCNT_VALUE(c) >= 1);
723
724     pa_context_ref(c);
725
726     if (c->version < 13) {
727         pa_context_fail(c, PA_ERR_PROTOCOL);
728         goto finish;
729     }
730
731     if (pa_tagstruct_getu32(t, &channel) < 0 ||
732         !pa_tagstruct_eof(t)) {
733         pa_context_fail(c, PA_ERR_PROTOCOL);
734         goto finish;
735     }
736
737     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
738         goto finish;
739
740     if (s->state != PA_STREAM_READY)
741         goto finish;
742
743     check_smoother_status(s, TRUE, TRUE, FALSE);
744     request_auto_timing_update(s, TRUE);
745
746     if (s->started_callback)
747         s->started_callback(s, s->started_userdata);
748
749 finish:
750     pa_context_unref(c);
751 }
752
753 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
754     pa_context *c = userdata;
755     pa_stream *s;
756     uint32_t channel;
757     pa_proplist *pl = NULL;
758     const char *event;
759
760     pa_assert(pd);
761     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
762     pa_assert(t);
763     pa_assert(c);
764     pa_assert(PA_REFCNT_VALUE(c) >= 1);
765
766     pa_context_ref(c);
767
768     if (c->version < 15) {
769         pa_context_fail(c, PA_ERR_PROTOCOL);
770         goto finish;
771     }
772
773     pl = pa_proplist_new();
774
775     if (pa_tagstruct_getu32(t, &channel) < 0 ||
776         pa_tagstruct_gets(t, &event) < 0 ||
777         pa_tagstruct_get_proplist(t, pl) < 0 ||
778         !pa_tagstruct_eof(t) || !event) {
779         pa_context_fail(c, PA_ERR_PROTOCOL);
780         goto finish;
781     }
782
783     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
784         goto finish;
785
786     if (s->state != PA_STREAM_READY)
787         goto finish;
788
789     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
790         /* Let client know what the running time was when the stream had to be
791          * killed  */
792         pa_usec_t time;
793         if (pa_stream_get_time(s, &time) == 0)
794             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) time);
795     }
796
797     if (s->event_callback)
798         s->event_callback(s, event, pl, s->event_userdata);
799
800 finish:
801     pa_context_unref(c);
802
803     if (pl)
804         pa_proplist_free(pl);
805 }
806
807 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
808     pa_stream *s;
809     pa_context *c = userdata;
810     uint32_t bytes, channel;
811
812     pa_assert(pd);
813     pa_assert(command == PA_COMMAND_REQUEST);
814     pa_assert(t);
815     pa_assert(c);
816     pa_assert(PA_REFCNT_VALUE(c) >= 1);
817
818     pa_context_ref(c);
819
820     if (pa_tagstruct_getu32(t, &channel) < 0 ||
821         pa_tagstruct_getu32(t, &bytes) < 0 ||
822         !pa_tagstruct_eof(t)) {
823         pa_context_fail(c, PA_ERR_PROTOCOL);
824         goto finish;
825     }
826
827     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
828         goto finish;
829
830     if (s->state != PA_STREAM_READY)
831         goto finish;
832
833     s->requested_bytes += bytes;
834
835     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
836
837     if (s->requested_bytes > 0 && s->write_callback)
838         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
839
840 finish:
841     pa_context_unref(c);
842 }
843
844 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
845     pa_stream *s;
846     pa_context *c = userdata;
847     uint32_t channel;
848
849     pa_assert(pd);
850     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
851     pa_assert(t);
852     pa_assert(c);
853     pa_assert(PA_REFCNT_VALUE(c) >= 1);
854
855     pa_context_ref(c);
856
857     if (pa_tagstruct_getu32(t, &channel) < 0 ||
858         !pa_tagstruct_eof(t)) {
859         pa_context_fail(c, PA_ERR_PROTOCOL);
860         goto finish;
861     }
862
863     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
864         goto finish;
865
866     if (s->state != PA_STREAM_READY)
867         goto finish;
868
869     if (s->buffer_attr.prebuf > 0)
870         check_smoother_status(s, TRUE, FALSE, TRUE);
871
872     request_auto_timing_update(s, TRUE);
873
874     if (command == PA_COMMAND_OVERFLOW) {
875         if (s->overflow_callback)
876             s->overflow_callback(s, s->overflow_userdata);
877     } else if (command == PA_COMMAND_UNDERFLOW) {
878         if (s->underflow_callback)
879             s->underflow_callback(s, s->underflow_userdata);
880     }
881
882 finish:
883     pa_context_unref(c);
884 }
885
886 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
887     pa_assert(s);
888     pa_assert(PA_REFCNT_VALUE(s) >= 1);
889
890 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
891
892     if (s->state != PA_STREAM_READY)
893         return;
894
895     if (w) {
896         s->write_index_not_before = s->context->ctag;
897
898         if (s->timing_info_valid)
899             s->timing_info.write_index_corrupt = TRUE;
900
901 /*         pa_log("write_index invalidated"); */
902     }
903
904     if (r) {
905         s->read_index_not_before = s->context->ctag;
906
907         if (s->timing_info_valid)
908             s->timing_info.read_index_corrupt = TRUE;
909
910 /*         pa_log("read_index invalidated"); */
911     }
912
913     request_auto_timing_update(s, TRUE);
914 }
915
916 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
917     pa_stream *s = userdata;
918
919     pa_assert(s);
920     pa_assert(PA_REFCNT_VALUE(s) >= 1);
921
922     pa_stream_ref(s);
923     request_auto_timing_update(s, FALSE);
924     pa_stream_unref(s);
925 }
926
927 static void create_stream_complete(pa_stream *s) {
928     pa_assert(s);
929     pa_assert(PA_REFCNT_VALUE(s) >= 1);
930     pa_assert(s->state == PA_STREAM_CREATING);
931
932     pa_stream_set_state(s, PA_STREAM_READY);
933
934     if (s->requested_bytes > 0 && s->write_callback)
935         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
936
937     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
938         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
939         pa_assert(!s->auto_timing_update_event);
940         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
941
942         request_auto_timing_update(s, TRUE);
943     }
944
945     check_smoother_status(s, TRUE, FALSE, FALSE);
946 }
947
948 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
949     const char *e;
950
951     pa_assert(s);
952     pa_assert(attr);
953
954     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
955         uint32_t ms;
956
957         if (pa_atou(e, &ms) < 0 || ms <= 0)
958             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
959         else {
960             attr->maxlength = (uint32_t) -1;
961             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
962             attr->minreq = (uint32_t) -1;
963             attr->prebuf = (uint32_t) -1;
964             attr->fragsize = attr->tlength;
965         }
966
967         if (flags)
968             *flags |= PA_STREAM_ADJUST_LATENCY;
969     }
970
971     if (s->context->version >= 13)
972         return;
973
974     /* Version older than 0.9.10 didn't do server side buffer_attr
975      * selection, hence we have to fake it on the client side. */
976
977     /* We choose fairly conservative values here, to not confuse
978      * old clients with extremely large playback buffers */
979
980     if (attr->maxlength == (uint32_t) -1)
981         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
982
983     if (attr->tlength == (uint32_t) -1)
984         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
985
986     if (attr->minreq == (uint32_t) -1)
987         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
988
989     if (attr->prebuf == (uint32_t) -1)
990         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
991
992     if (attr->fragsize == (uint32_t) -1)
993         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
994 }
995
996 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
997     pa_stream *s = userdata;
998     uint32_t requested_bytes = 0;
999
1000     pa_assert(pd);
1001     pa_assert(s);
1002     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1003     pa_assert(s->state == PA_STREAM_CREATING);
1004
1005     pa_stream_ref(s);
1006
1007     if (command != PA_COMMAND_REPLY) {
1008         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1009             goto finish;
1010
1011         pa_stream_set_state(s, PA_STREAM_FAILED);
1012         goto finish;
1013     }
1014
1015     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1016         s->channel == PA_INVALID_INDEX ||
1017         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1018         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1019         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1020         goto finish;
1021     }
1022
1023     s->requested_bytes = (int64_t) requested_bytes;
1024
1025     if (s->context->version >= 9) {
1026         if (s->direction == PA_STREAM_PLAYBACK) {
1027             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1028                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1029                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1030                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1031                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1032                 goto finish;
1033             }
1034         } else if (s->direction == PA_STREAM_RECORD) {
1035             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1036                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1037                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1038                 goto finish;
1039             }
1040         }
1041     }
1042
1043     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1044         pa_sample_spec ss;
1045         pa_channel_map cm;
1046         const char *dn = NULL;
1047         pa_bool_t suspended;
1048
1049         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1050             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1051             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1052             pa_tagstruct_gets(t, &dn) < 0 ||
1053             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1054             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1055             goto finish;
1056         }
1057
1058         if (!dn || s->device_index == PA_INVALID_INDEX ||
1059             ss.channels != cm.channels ||
1060             !pa_channel_map_valid(&cm) ||
1061             !pa_sample_spec_valid(&ss) ||
1062             (s->n_formats == 0 && (
1063                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1064                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1065                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1066             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1067             goto finish;
1068         }
1069
1070         pa_xfree(s->device_name);
1071         s->device_name = pa_xstrdup(dn);
1072         s->suspended = suspended;
1073
1074         s->channel_map = cm;
1075         s->sample_spec = ss;
1076     }
1077
1078     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1079         pa_usec_t usec;
1080
1081         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1082             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1083             goto finish;
1084         }
1085
1086         if (s->direction == PA_STREAM_RECORD)
1087             s->timing_info.configured_source_usec = usec;
1088         else
1089             s->timing_info.configured_sink_usec = usec;
1090     }
1091
1092     if (s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) {
1093         pa_format_info *f = pa_format_info_new();
1094         pa_tagstruct_get_format_info(t, f);
1095
1096         if (pa_format_info_valid(f))
1097             s->format = f;
1098         else {
1099             pa_format_info_free(f);
1100             if (s->n_formats > 0) {
1101                 /* We used the extended API, so we should have got back a proper format */
1102                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1103                 goto finish;
1104             }
1105         }
1106     }
1107
1108     if (!pa_tagstruct_eof(t)) {
1109         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1110         goto finish;
1111     }
1112
1113     if (s->direction == PA_STREAM_RECORD) {
1114         pa_assert(!s->record_memblockq);
1115
1116         s->record_memblockq = pa_memblockq_new(
1117                 0,
1118                 s->buffer_attr.maxlength,
1119                 0,
1120                 pa_frame_size(&s->sample_spec),
1121                 1,
1122                 0,
1123                 0,
1124                 NULL);
1125     }
1126
1127     s->channel_valid = TRUE;
1128     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1129
1130     create_stream_complete(s);
1131
1132 finish:
1133     pa_stream_unref(s);
1134 }
1135
1136 static int create_stream(
1137         pa_stream_direction_t direction,
1138         pa_stream *s,
1139         const char *dev,
1140         const pa_buffer_attr *attr,
1141         pa_stream_flags_t flags,
1142         const pa_cvolume *volume,
1143         pa_stream *sync_stream) {
1144
1145     pa_tagstruct *t;
1146     uint32_t tag;
1147     pa_bool_t volume_set = FALSE;
1148     uint32_t i;
1149
1150     pa_assert(s);
1151     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1152     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1153
1154     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1155     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1156     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1157     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1158                                               PA_STREAM_INTERPOLATE_TIMING|
1159                                               PA_STREAM_NOT_MONOTONIC|
1160                                               PA_STREAM_AUTO_TIMING_UPDATE|
1161                                               PA_STREAM_NO_REMAP_CHANNELS|
1162                                               PA_STREAM_NO_REMIX_CHANNELS|
1163                                               PA_STREAM_FIX_FORMAT|
1164                                               PA_STREAM_FIX_RATE|
1165                                               PA_STREAM_FIX_CHANNELS|
1166                                               PA_STREAM_DONT_MOVE|
1167                                               PA_STREAM_VARIABLE_RATE|
1168                                               PA_STREAM_PEAK_DETECT|
1169                                               PA_STREAM_START_MUTED|
1170                                               PA_STREAM_ADJUST_LATENCY|
1171                                               PA_STREAM_EARLY_REQUESTS|
1172                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1173                                               PA_STREAM_START_UNMUTED|
1174                                               PA_STREAM_FAIL_ON_SUSPEND|
1175                                               PA_STREAM_RELATIVE_VOLUME|
1176                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1177
1178
1179     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1180     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1181     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1182     /* Althought some of the other flags are not supported on older
1183      * version, we don't check for them here, because it doesn't hurt
1184      * when they are passed but actually not supported. This makes
1185      * client development easier */
1186
1187     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1188     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1189     PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1190     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1191     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1192
1193     pa_stream_ref(s);
1194
1195     s->direction = direction;
1196
1197     if (sync_stream)
1198         s->syncid = sync_stream->syncid;
1199
1200     if (attr)
1201         s->buffer_attr = *attr;
1202     patch_buffer_attr(s, &s->buffer_attr, &flags);
1203
1204     s->flags = flags;
1205     s->corked = !!(flags & PA_STREAM_START_CORKED);
1206
1207     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1208         pa_usec_t x;
1209
1210         x = pa_rtclock_now();
1211
1212         pa_assert(!s->smoother);
1213         s->smoother = pa_smoother_new(
1214                 SMOOTHER_ADJUST_TIME,
1215                 SMOOTHER_HISTORY_TIME,
1216                 !(flags & PA_STREAM_NOT_MONOTONIC),
1217                 TRUE,
1218                 SMOOTHER_MIN_HISTORY,
1219                 x,
1220                 TRUE);
1221     }
1222
1223     if (!dev)
1224         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1225
1226     t = pa_tagstruct_command(
1227             s->context,
1228             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1229             &tag);
1230
1231     if (s->context->version < 13)
1232         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1233
1234     pa_tagstruct_put(
1235             t,
1236             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1237             PA_TAG_CHANNEL_MAP, &s->channel_map,
1238             PA_TAG_U32, PA_INVALID_INDEX,
1239             PA_TAG_STRING, dev,
1240             PA_TAG_U32, s->buffer_attr.maxlength,
1241             PA_TAG_BOOLEAN, s->corked,
1242             PA_TAG_INVALID);
1243
1244     if (s->direction == PA_STREAM_PLAYBACK) {
1245         pa_cvolume cv;
1246
1247         pa_tagstruct_put(
1248                 t,
1249                 PA_TAG_U32, s->buffer_attr.tlength,
1250                 PA_TAG_U32, s->buffer_attr.prebuf,
1251                 PA_TAG_U32, s->buffer_attr.minreq,
1252                 PA_TAG_U32, s->syncid,
1253                 PA_TAG_INVALID);
1254
1255         volume_set = !!volume;
1256
1257         if (!volume) {
1258             if (pa_sample_spec_valid(&s->sample_spec))
1259                 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1260             else {
1261                 /* This is not really relevant, since no volume was set, and
1262                  * the real number of channels is embedded in the format_info
1263                  * structure */
1264                 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1265             }
1266         }
1267
1268         pa_tagstruct_put_cvolume(t, volume);
1269     } else
1270         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1271
1272     if (s->context->version >= 12) {
1273         pa_tagstruct_put(
1274                 t,
1275                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1276                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1277                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1278                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1279                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1280                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1281                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1282                 PA_TAG_INVALID);
1283     }
1284
1285     if (s->context->version >= 13) {
1286
1287         if (s->direction == PA_STREAM_PLAYBACK)
1288             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1289         else
1290             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1291
1292         pa_tagstruct_put(
1293                 t,
1294                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1295                 PA_TAG_PROPLIST, s->proplist,
1296                 PA_TAG_INVALID);
1297
1298         if (s->direction == PA_STREAM_RECORD)
1299             pa_tagstruct_putu32(t, s->direct_on_input);
1300     }
1301
1302     if (s->context->version >= 14) {
1303
1304         if (s->direction == PA_STREAM_PLAYBACK)
1305             pa_tagstruct_put_boolean(t, volume_set);
1306
1307         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1308     }
1309
1310     if (s->context->version >= 15) {
1311
1312         if (s->direction == PA_STREAM_PLAYBACK)
1313             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1314
1315         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1316         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1317     }
1318
1319     if (s->context->version >= 17) {
1320
1321         if (s->direction == PA_STREAM_PLAYBACK)
1322             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1323
1324     }
1325
1326     if (s->context->version >= 18) {
1327
1328         if (s->direction == PA_STREAM_PLAYBACK)
1329             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1330     }
1331
1332     if (s->context->version >= 21) {
1333
1334         if (s->direction == PA_STREAM_PLAYBACK) {
1335             pa_tagstruct_putu8(t, s->n_formats);
1336             for (i = 0; i < s->n_formats; i++)
1337                 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1338         }
1339     }
1340
1341     pa_pstream_send_tagstruct(s->context->pstream, t);
1342     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1343
1344     pa_stream_set_state(s, PA_STREAM_CREATING);
1345
1346     pa_stream_unref(s);
1347     return 0;
1348 }
1349
1350 int pa_stream_connect_playback(
1351         pa_stream *s,
1352         const char *dev,
1353         const pa_buffer_attr *attr,
1354         pa_stream_flags_t flags,
1355         const pa_cvolume *volume,
1356         pa_stream *sync_stream) {
1357
1358     pa_assert(s);
1359     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1360
1361     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1362 }
1363
1364 int pa_stream_connect_record(
1365         pa_stream *s,
1366         const char *dev,
1367         const pa_buffer_attr *attr,
1368         pa_stream_flags_t flags) {
1369
1370     pa_assert(s);
1371     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1372
1373     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1374 }
1375
1376 int pa_stream_begin_write(
1377         pa_stream *s,
1378         void **data,
1379         size_t *nbytes) {
1380
1381     pa_assert(s);
1382     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1383
1384     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1385     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1386     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1387     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1388     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1389
1390     if (*nbytes != (size_t) -1) {
1391         size_t m, fs;
1392
1393         m = pa_mempool_block_size_max(s->context->mempool);
1394         fs = pa_frame_size(&s->sample_spec);
1395
1396         m = (m / fs) * fs;
1397         if (*nbytes > m)
1398             *nbytes = m;
1399     }
1400
1401     if (!s->write_memblock) {
1402         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1403         s->write_data = pa_memblock_acquire(s->write_memblock);
1404     }
1405
1406     *data = s->write_data;
1407     *nbytes = pa_memblock_get_length(s->write_memblock);
1408
1409     return 0;
1410 }
1411
1412 int pa_stream_cancel_write(
1413         pa_stream *s) {
1414
1415     pa_assert(s);
1416     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1417
1418     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1419     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1420     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1421     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1422
1423     pa_assert(s->write_data);
1424
1425     pa_memblock_release(s->write_memblock);
1426     pa_memblock_unref(s->write_memblock);
1427     s->write_memblock = NULL;
1428     s->write_data = NULL;
1429
1430     return 0;
1431 }
1432
1433 int pa_stream_write(
1434         pa_stream *s,
1435         const void *data,
1436         size_t length,
1437         pa_free_cb_t free_cb,
1438         int64_t offset,
1439         pa_seek_mode_t seek) {
1440
1441     pa_assert(s);
1442     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1443     pa_assert(data);
1444
1445     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1446     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1447     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1448     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1449     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1450     PA_CHECK_VALIDITY(s->context,
1451                       !s->write_memblock ||
1452                       ((data >= s->write_data) &&
1453                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1454                       PA_ERR_INVALID);
1455     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1456
1457     if (s->write_memblock) {
1458         pa_memchunk chunk;
1459
1460         /* pa_stream_write_begin() was called before */
1461
1462         pa_memblock_release(s->write_memblock);
1463
1464         chunk.memblock = s->write_memblock;
1465         chunk.index = (const char *) data - (const char *) s->write_data;
1466         chunk.length = length;
1467
1468         s->write_memblock = NULL;
1469         s->write_data = NULL;
1470
1471         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1472         pa_memblock_unref(chunk.memblock);
1473
1474     } else {
1475         pa_seek_mode_t t_seek = seek;
1476         int64_t t_offset = offset;
1477         size_t t_length = length;
1478         const void *t_data = data;
1479
1480         /* pa_stream_write_begin() was not called before */
1481
1482         while (t_length > 0) {
1483             pa_memchunk chunk;
1484
1485             chunk.index = 0;
1486
1487             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1488                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1489                 chunk.length = t_length;
1490             } else {
1491                 void *d;
1492
1493                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1494                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1495
1496                 d = pa_memblock_acquire(chunk.memblock);
1497                 memcpy(d, t_data, chunk.length);
1498                 pa_memblock_release(chunk.memblock);
1499             }
1500
1501             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1502
1503             t_offset = 0;
1504             t_seek = PA_SEEK_RELATIVE;
1505
1506             t_data = (const uint8_t*) t_data + chunk.length;
1507             t_length -= chunk.length;
1508
1509             pa_memblock_unref(chunk.memblock);
1510         }
1511
1512         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1513             free_cb((void*) data);
1514     }
1515
1516     /* This is obviously wrong since we ignore the seeking index . But
1517      * that's OK, the server side applies the same error */
1518     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1519
1520     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1521
1522     if (s->direction == PA_STREAM_PLAYBACK) {
1523
1524         /* Update latency request correction */
1525         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1526
1527             if (seek == PA_SEEK_ABSOLUTE) {
1528                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1529                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1530                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1531             } else if (seek == PA_SEEK_RELATIVE) {
1532                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1533                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1534             } else
1535                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1536         }
1537
1538         /* Update the write index in the already available latency data */
1539         if (s->timing_info_valid) {
1540
1541             if (seek == PA_SEEK_ABSOLUTE) {
1542                 s->timing_info.write_index_corrupt = FALSE;
1543                 s->timing_info.write_index = offset + (int64_t) length;
1544             } else if (seek == PA_SEEK_RELATIVE) {
1545                 if (!s->timing_info.write_index_corrupt)
1546                     s->timing_info.write_index += offset + (int64_t) length;
1547             } else
1548                 s->timing_info.write_index_corrupt = TRUE;
1549         }
1550
1551         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1552             request_auto_timing_update(s, TRUE);
1553     }
1554
1555     return 0;
1556 }
1557
1558 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1559     pa_assert(s);
1560     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1561     pa_assert(data);
1562     pa_assert(length);
1563
1564     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1565     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1566     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1567
1568     if (!s->peek_memchunk.memblock) {
1569
1570         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1571             *data = NULL;
1572             *length = 0;
1573             return 0;
1574         }
1575
1576         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1577     }
1578
1579     pa_assert(s->peek_data);
1580     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1581     *length = s->peek_memchunk.length;
1582     return 0;
1583 }
1584
1585 int pa_stream_drop(pa_stream *s) {
1586     pa_assert(s);
1587     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1588
1589     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1590     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1591     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1592     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1593
1594     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1595
1596     /* Fix the simulated local read index */
1597     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1598         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1599
1600     pa_assert(s->peek_data);
1601     pa_memblock_release(s->peek_memchunk.memblock);
1602     pa_memblock_unref(s->peek_memchunk.memblock);
1603     pa_memchunk_reset(&s->peek_memchunk);
1604
1605     return 0;
1606 }
1607
1608 size_t pa_stream_writable_size(pa_stream *s) {
1609     pa_assert(s);
1610     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1611
1612     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1613     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1614     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1615
1616     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1617 }
1618
1619 size_t pa_stream_readable_size(pa_stream *s) {
1620     pa_assert(s);
1621     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1622
1623     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1624     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1625     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1626
1627     return pa_memblockq_get_length(s->record_memblockq);
1628 }
1629
1630 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1631     pa_operation *o;
1632     pa_tagstruct *t;
1633     uint32_t tag;
1634
1635     pa_assert(s);
1636     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1637
1638     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1639     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1640     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1641
1642     /* Ask for a timing update before we cork/uncork to get the best
1643      * accuracy for the transport latency suitable for the
1644      * check_smoother_status() call in the started callback */
1645     request_auto_timing_update(s, TRUE);
1646
1647     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1648
1649     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1650     pa_tagstruct_putu32(t, s->channel);
1651     pa_pstream_send_tagstruct(s->context->pstream, t);
1652     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1653
1654     /* This might cause the read index to conitnue again, hence
1655      * let's request a timing update */
1656     request_auto_timing_update(s, TRUE);
1657
1658     return o;
1659 }
1660
1661 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1662     pa_usec_t usec;
1663
1664     pa_assert(s);
1665     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1666     pa_assert(s->state == PA_STREAM_READY);
1667     pa_assert(s->direction != PA_STREAM_UPLOAD);
1668     pa_assert(s->timing_info_valid);
1669     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1670     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1671
1672     if (s->direction == PA_STREAM_PLAYBACK) {
1673         /* The last byte that was written into the output device
1674          * had this time value associated */
1675         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1676
1677         if (!s->corked && !s->suspended) {
1678
1679             if (!ignore_transport)
1680                 /* Because the latency info took a little time to come
1681                  * to us, we assume that the real output time is actually
1682                  * a little ahead */
1683                 usec += s->timing_info.transport_usec;
1684
1685             /* However, the output device usually maintains a buffer
1686                too, hence the real sample currently played is a little
1687                back  */
1688             if (s->timing_info.sink_usec >= usec)
1689                 usec = 0;
1690             else
1691                 usec -= s->timing_info.sink_usec;
1692         }
1693
1694     } else {
1695         pa_assert(s->direction == PA_STREAM_RECORD);
1696
1697         /* The last byte written into the server side queue had
1698          * this time value associated */
1699         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1700
1701         if (!s->corked && !s->suspended) {
1702
1703             if (!ignore_transport)
1704                 /* Add transport latency */
1705                 usec += s->timing_info.transport_usec;
1706
1707             /* Add latency of data in device buffer */
1708             usec += s->timing_info.source_usec;
1709
1710             /* If this is a monitor source, we need to correct the
1711              * time by the playback device buffer */
1712             if (s->timing_info.sink_usec >= usec)
1713                 usec = 0;
1714             else
1715                 usec -= s->timing_info.sink_usec;
1716         }
1717     }
1718
1719     return usec;
1720 }
1721
1722 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1723     pa_operation *o = userdata;
1724     struct timeval local, remote, now;
1725     pa_timing_info *i;
1726     pa_bool_t playing = FALSE;
1727     uint64_t underrun_for = 0, playing_for = 0;
1728
1729     pa_assert(pd);
1730     pa_assert(o);
1731     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1732
1733     if (!o->context || !o->stream)
1734         goto finish;
1735
1736     i = &o->stream->timing_info;
1737
1738     o->stream->timing_info_valid = FALSE;
1739     i->write_index_corrupt = TRUE;
1740     i->read_index_corrupt = TRUE;
1741
1742     if (command != PA_COMMAND_REPLY) {
1743         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1744             goto finish;
1745
1746     } else {
1747
1748         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1749             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1750             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1751             pa_tagstruct_get_timeval(t, &local) < 0 ||
1752             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1753             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1754             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1755
1756             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1757             goto finish;
1758         }
1759
1760         if (o->context->version >= 13 &&
1761             o->stream->direction == PA_STREAM_PLAYBACK)
1762             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1763                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1764
1765                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1766                 goto finish;
1767             }
1768
1769
1770         if (!pa_tagstruct_eof(t)) {
1771             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1772             goto finish;
1773         }
1774         o->stream->timing_info_valid = TRUE;
1775         i->write_index_corrupt = FALSE;
1776         i->read_index_corrupt = FALSE;
1777
1778         i->playing = (int) playing;
1779         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1780
1781         pa_gettimeofday(&now);
1782
1783         /* Calculcate timestamps */
1784         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1785             /* local and remote seem to have synchronized clocks */
1786
1787             if (o->stream->direction == PA_STREAM_PLAYBACK)
1788                 i->transport_usec = pa_timeval_diff(&remote, &local);
1789             else
1790                 i->transport_usec = pa_timeval_diff(&now, &remote);
1791
1792             i->synchronized_clocks = TRUE;
1793             i->timestamp = remote;
1794         } else {
1795             /* clocks are not synchronized, let's estimate latency then */
1796             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1797             i->synchronized_clocks = FALSE;
1798             i->timestamp = local;
1799             pa_timeval_add(&i->timestamp, i->transport_usec);
1800         }
1801
1802         /* Invalidate read and write indexes if necessary */
1803         if (tag < o->stream->read_index_not_before)
1804             i->read_index_corrupt = TRUE;
1805
1806         if (tag < o->stream->write_index_not_before)
1807             i->write_index_corrupt = TRUE;
1808
1809         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1810             /* Write index correction */
1811
1812             int n, j;
1813             uint32_t ctag = tag;
1814
1815             /* Go through the saved correction values and add up the
1816              * total correction.*/
1817             for (n = 0, j = o->stream->current_write_index_correction+1;
1818                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1819                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1820
1821                 /* Step over invalid data or out-of-date data */
1822                 if (!o->stream->write_index_corrections[j].valid ||
1823                     o->stream->write_index_corrections[j].tag < ctag)
1824                     continue;
1825
1826                 /* Make sure that everything is in order */
1827                 ctag = o->stream->write_index_corrections[j].tag+1;
1828
1829                 /* Now fix the write index */
1830                 if (o->stream->write_index_corrections[j].corrupt) {
1831                     /* A corrupting seek was made */
1832                     i->write_index_corrupt = TRUE;
1833                 } else if (o->stream->write_index_corrections[j].absolute) {
1834                     /* An absolute seek was made */
1835                     i->write_index = o->stream->write_index_corrections[j].value;
1836                     i->write_index_corrupt = FALSE;
1837                 } else if (!i->write_index_corrupt) {
1838                     /* A relative seek was made */
1839                     i->write_index += o->stream->write_index_corrections[j].value;
1840                 }
1841             }
1842
1843             /* Clear old correction entries */
1844             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1845                 if (!o->stream->write_index_corrections[n].valid)
1846                     continue;
1847
1848                 if (o->stream->write_index_corrections[n].tag <= tag)
1849                     o->stream->write_index_corrections[n].valid = FALSE;
1850             }
1851         }
1852
1853         if (o->stream->direction == PA_STREAM_RECORD) {
1854             /* Read index correction */
1855
1856             if (!i->read_index_corrupt)
1857                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1858         }
1859
1860         /* Update smoother if we're not corked */
1861         if (o->stream->smoother && !o->stream->corked) {
1862             pa_usec_t u, x;
1863
1864             u = x = pa_rtclock_now() - i->transport_usec;
1865
1866             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1867                 pa_usec_t su;
1868
1869                 /* If we weren't playing then it will take some time
1870                  * until the audio will actually come out through the
1871                  * speakers. Since we follow that timing here, we need
1872                  * to try to fix this up */
1873
1874                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1875
1876                 if (su < i->sink_usec)
1877                     x += i->sink_usec - su;
1878             }
1879
1880             if (!i->playing)
1881                 pa_smoother_pause(o->stream->smoother, x);
1882
1883             /* Update the smoother */
1884             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1885                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1886                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1887
1888             if (i->playing)
1889                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1890         }
1891     }
1892
1893     o->stream->auto_timing_update_requested = FALSE;
1894
1895     if (o->stream->latency_update_callback)
1896         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1897
1898     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1899         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1900         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1901     }
1902
1903 finish:
1904
1905     pa_operation_done(o);
1906     pa_operation_unref(o);
1907 }
1908
1909 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1910     uint32_t tag;
1911     pa_operation *o;
1912     pa_tagstruct *t;
1913     struct timeval now;
1914     int cidx = 0;
1915
1916     pa_assert(s);
1917     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1918
1919     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1920     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1921     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1922
1923     if (s->direction == PA_STREAM_PLAYBACK) {
1924         /* Find a place to store the write_index correction data for this entry */
1925         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1926
1927         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1928         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1929     }
1930     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1931
1932     t = pa_tagstruct_command(
1933             s->context,
1934             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1935             &tag);
1936     pa_tagstruct_putu32(t, s->channel);
1937     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1938
1939     pa_pstream_send_tagstruct(s->context->pstream, t);
1940     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1941
1942     if (s->direction == PA_STREAM_PLAYBACK) {
1943         /* Fill in initial correction data */
1944
1945         s->current_write_index_correction = cidx;
1946
1947         s->write_index_corrections[cidx].valid = TRUE;
1948         s->write_index_corrections[cidx].absolute = FALSE;
1949         s->write_index_corrections[cidx].corrupt = FALSE;
1950         s->write_index_corrections[cidx].tag = tag;
1951         s->write_index_corrections[cidx].value = 0;
1952     }
1953
1954     return o;
1955 }
1956
1957 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1958     pa_stream *s = userdata;
1959
1960     pa_assert(pd);
1961     pa_assert(s);
1962     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1963
1964     pa_stream_ref(s);
1965
1966     if (command != PA_COMMAND_REPLY) {
1967         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1968             goto finish;
1969
1970         pa_stream_set_state(s, PA_STREAM_FAILED);
1971         goto finish;
1972     } else if (!pa_tagstruct_eof(t)) {
1973         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1974         goto finish;
1975     }
1976
1977     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1978
1979 finish:
1980     pa_stream_unref(s);
1981 }
1982
1983 int pa_stream_disconnect(pa_stream *s) {
1984     pa_tagstruct *t;
1985     uint32_t tag;
1986
1987     pa_assert(s);
1988     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1989
1990     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1991     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1992     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1993
1994     pa_stream_ref(s);
1995
1996     t = pa_tagstruct_command(
1997             s->context,
1998             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1999                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2000             &tag);
2001     pa_tagstruct_putu32(t, s->channel);
2002     pa_pstream_send_tagstruct(s->context->pstream, t);
2003     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2004
2005     pa_stream_unref(s);
2006     return 0;
2007 }
2008
2009 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2010     pa_assert(s);
2011     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2012
2013     if (pa_detect_fork())
2014         return;
2015
2016     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2017         return;
2018
2019     s->read_callback = cb;
2020     s->read_userdata = userdata;
2021 }
2022
2023 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2024     pa_assert(s);
2025     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2026
2027     if (pa_detect_fork())
2028         return;
2029
2030     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2031         return;
2032
2033     s->write_callback = cb;
2034     s->write_userdata = userdata;
2035 }
2036
2037 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2038     pa_assert(s);
2039     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2040
2041     if (pa_detect_fork())
2042         return;
2043
2044     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2045         return;
2046
2047     s->state_callback = cb;
2048     s->state_userdata = userdata;
2049 }
2050
2051 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2052     pa_assert(s);
2053     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2054
2055     if (pa_detect_fork())
2056         return;
2057
2058     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2059         return;
2060
2061     s->overflow_callback = cb;
2062     s->overflow_userdata = userdata;
2063 }
2064
2065 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2066     pa_assert(s);
2067     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2068
2069     if (pa_detect_fork())
2070         return;
2071
2072     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2073         return;
2074
2075     s->underflow_callback = cb;
2076     s->underflow_userdata = userdata;
2077 }
2078
2079 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2080     pa_assert(s);
2081     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2082
2083     if (pa_detect_fork())
2084         return;
2085
2086     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2087         return;
2088
2089     s->latency_update_callback = cb;
2090     s->latency_update_userdata = userdata;
2091 }
2092
2093 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2094     pa_assert(s);
2095     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2096
2097     if (pa_detect_fork())
2098         return;
2099
2100     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2101         return;
2102
2103     s->moved_callback = cb;
2104     s->moved_userdata = userdata;
2105 }
2106
2107 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2108     pa_assert(s);
2109     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2110
2111     if (pa_detect_fork())
2112         return;
2113
2114     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2115         return;
2116
2117     s->suspended_callback = cb;
2118     s->suspended_userdata = userdata;
2119 }
2120
2121 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2122     pa_assert(s);
2123     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2124
2125     if (pa_detect_fork())
2126         return;
2127
2128     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2129         return;
2130
2131     s->started_callback = cb;
2132     s->started_userdata = userdata;
2133 }
2134
2135 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2136     pa_assert(s);
2137     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2138
2139     if (pa_detect_fork())
2140         return;
2141
2142     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2143         return;
2144
2145     s->event_callback = cb;
2146     s->event_userdata = userdata;
2147 }
2148
2149 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2150     pa_assert(s);
2151     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2152
2153     if (pa_detect_fork())
2154         return;
2155
2156     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2157         return;
2158
2159     s->buffer_attr_callback = cb;
2160     s->buffer_attr_userdata = userdata;
2161 }
2162
2163 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2164     pa_operation *o = userdata;
2165     int success = 1;
2166
2167     pa_assert(pd);
2168     pa_assert(o);
2169     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2170
2171     if (!o->context)
2172         goto finish;
2173
2174     if (command != PA_COMMAND_REPLY) {
2175         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2176             goto finish;
2177
2178         success = 0;
2179     } else if (!pa_tagstruct_eof(t)) {
2180         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2181         goto finish;
2182     }
2183
2184     if (o->callback) {
2185         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2186         cb(o->stream, success, o->userdata);
2187     }
2188
2189 finish:
2190     pa_operation_done(o);
2191     pa_operation_unref(o);
2192 }
2193
2194 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2195     pa_operation *o;
2196     pa_tagstruct *t;
2197     uint32_t tag;
2198
2199     pa_assert(s);
2200     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2201
2202     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2203     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2204     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2205
2206     /* Ask for a timing update before we cork/uncork to get the best
2207      * accuracy for the transport latency suitable for the
2208      * check_smoother_status() call in the started callback */
2209     request_auto_timing_update(s, TRUE);
2210
2211     s->corked = b;
2212
2213     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2214
2215     t = pa_tagstruct_command(
2216             s->context,
2217             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2218             &tag);
2219     pa_tagstruct_putu32(t, s->channel);
2220     pa_tagstruct_put_boolean(t, !!b);
2221     pa_pstream_send_tagstruct(s->context->pstream, t);
2222     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2223
2224     check_smoother_status(s, FALSE, FALSE, FALSE);
2225
2226     /* This might cause the indexes to hang/start again, hence let's
2227      * request a timing update, after the cork/uncork, too */
2228     request_auto_timing_update(s, TRUE);
2229
2230     return o;
2231 }
2232
2233 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2234     pa_tagstruct *t;
2235     pa_operation *o;
2236     uint32_t tag;
2237
2238     pa_assert(s);
2239     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2240
2241     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2242     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2243
2244     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2245
2246     t = pa_tagstruct_command(s->context, command, &tag);
2247     pa_tagstruct_putu32(t, s->channel);
2248     pa_pstream_send_tagstruct(s->context->pstream, t);
2249     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2250
2251     return o;
2252 }
2253
2254 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2255     pa_operation *o;
2256
2257     pa_assert(s);
2258     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2259
2260     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2261     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2262     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2263
2264     /* Ask for a timing update *before* the flush, so that the
2265      * transport usec is as up to date as possible when we get the
2266      * underflow message and update the smoother status*/
2267     request_auto_timing_update(s, TRUE);
2268
2269     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2270         return NULL;
2271
2272     if (s->direction == PA_STREAM_PLAYBACK) {
2273
2274         if (s->write_index_corrections[s->current_write_index_correction].valid)
2275             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2276
2277         if (s->buffer_attr.prebuf > 0)
2278             check_smoother_status(s, FALSE, FALSE, TRUE);
2279
2280         /* This will change the write index, but leave the
2281          * read index untouched. */
2282         invalidate_indexes(s, FALSE, TRUE);
2283
2284     } else
2285         /* For record streams this has no influence on the write
2286          * index, but the read index might jump. */
2287         invalidate_indexes(s, TRUE, FALSE);
2288
2289     /* Note that we do not update requested_bytes here. This is
2290      * because we cannot really know how data actually was dropped
2291      * from the write index due to this. This 'error' will be applied
2292      * by both client and server and hence we should be fine. */
2293
2294     return o;
2295 }
2296
2297 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2298     pa_operation *o;
2299
2300     pa_assert(s);
2301     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2302
2303     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2304     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2305     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2306     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2307
2308     /* Ask for a timing update before we cork/uncork to get the best
2309      * accuracy for the transport latency suitable for the
2310      * check_smoother_status() call in the started callback */
2311     request_auto_timing_update(s, TRUE);
2312
2313     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2314         return NULL;
2315
2316     /* This might cause the read index to hang again, hence
2317      * let's request a timing update */
2318     request_auto_timing_update(s, TRUE);
2319
2320     return o;
2321 }
2322
2323 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2324     pa_operation *o;
2325
2326     pa_assert(s);
2327     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2328
2329     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2330     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2331     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2332     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2333
2334     /* Ask for a timing update before we cork/uncork to get the best
2335      * accuracy for the transport latency suitable for the
2336      * check_smoother_status() call in the started callback */
2337     request_auto_timing_update(s, TRUE);
2338
2339     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2340         return NULL;
2341
2342     /* This might cause the read index to start moving again, hence
2343      * let's request a timing update */
2344     request_auto_timing_update(s, TRUE);
2345
2346     return o;
2347 }
2348
2349 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2350     pa_operation *o;
2351
2352     pa_assert(s);
2353     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2354     pa_assert(name);
2355
2356     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2357     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2358     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2359
2360     if (s->context->version >= 13) {
2361         pa_proplist *p = pa_proplist_new();
2362
2363         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2364         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2365         pa_proplist_free(p);
2366     } else {
2367         pa_tagstruct *t;
2368         uint32_t tag;
2369
2370         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2371         t = pa_tagstruct_command(
2372                 s->context,
2373                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2374                 &tag);
2375         pa_tagstruct_putu32(t, s->channel);
2376         pa_tagstruct_puts(t, name);
2377         pa_pstream_send_tagstruct(s->context->pstream, t);
2378         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2379     }
2380
2381     return o;
2382 }
2383
2384 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2385     pa_usec_t usec;
2386
2387     pa_assert(s);
2388     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2389
2390     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2391     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2392     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2393     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2394     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2395     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2396
2397     if (s->smoother)
2398         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2399     else
2400         usec = calc_time(s, FALSE);
2401
2402     /* Make sure the time runs monotonically */
2403     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2404         if (usec < s->previous_time)
2405             usec = s->previous_time;
2406         else
2407             s->previous_time = usec;
2408     }
2409
2410     if (r_usec)
2411         *r_usec = usec;
2412
2413     return 0;
2414 }
2415
2416 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2417     pa_assert(s);
2418     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2419
2420     if (negative)
2421         *negative = 0;
2422
2423     if (a >= b)
2424         return a-b;
2425     else {
2426         if (negative && s->direction == PA_STREAM_RECORD) {
2427             *negative = 1;
2428             return b-a;
2429         } else
2430             return 0;
2431     }
2432 }
2433
2434 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2435     pa_usec_t t, c;
2436     int r;
2437     int64_t cindex;
2438
2439     pa_assert(s);
2440     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2441     pa_assert(r_usec);
2442
2443     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2444     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2445     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2446     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2447     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2448     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2449
2450     if ((r = pa_stream_get_time(s, &t)) < 0)
2451         return r;
2452
2453     if (s->direction == PA_STREAM_PLAYBACK)
2454         cindex = s->timing_info.write_index;
2455     else
2456         cindex = s->timing_info.read_index;
2457
2458     if (cindex < 0)
2459         cindex = 0;
2460
2461     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2462
2463     if (s->direction == PA_STREAM_PLAYBACK)
2464         *r_usec = time_counter_diff(s, c, t, negative);
2465     else
2466         *r_usec = time_counter_diff(s, t, c, negative);
2467
2468     return 0;
2469 }
2470
2471 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2472     pa_assert(s);
2473     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2474
2475     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2476     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2477     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2478     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2479
2480     return &s->timing_info;
2481 }
2482
2483 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2484     pa_assert(s);
2485     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2486
2487     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2488
2489     return &s->sample_spec;
2490 }
2491
2492 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2493     pa_assert(s);
2494     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2495
2496     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2497
2498     return &s->channel_map;
2499 }
2500
2501 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2502     pa_assert(s);
2503     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2504
2505     /* We don't have the format till routing is done */
2506     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2507     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2508
2509     return s->format;
2510 }
2511 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2512     pa_assert(s);
2513     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2514
2515     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2516     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2517     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2518
2519     return &s->buffer_attr;
2520 }
2521
2522 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2523     pa_operation *o = userdata;
2524     int success = 1;
2525
2526     pa_assert(pd);
2527     pa_assert(o);
2528     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2529
2530     if (!o->context)
2531         goto finish;
2532
2533     if (command != PA_COMMAND_REPLY) {
2534         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2535             goto finish;
2536
2537         success = 0;
2538     } else {
2539         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2540             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2541                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2542                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2543                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2544                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2545                 goto finish;
2546             }
2547         } else if (o->stream->direction == PA_STREAM_RECORD) {
2548             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2549                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2550                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2551                 goto finish;
2552             }
2553         }
2554
2555         if (o->stream->context->version >= 13) {
2556             pa_usec_t usec;
2557
2558             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2559                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2560                 goto finish;
2561             }
2562
2563             if (o->stream->direction == PA_STREAM_RECORD)
2564                 o->stream->timing_info.configured_source_usec = usec;
2565             else
2566                 o->stream->timing_info.configured_sink_usec = usec;
2567         }
2568
2569         if (!pa_tagstruct_eof(t)) {
2570             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2571             goto finish;
2572         }
2573     }
2574
2575     if (o->callback) {
2576         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2577         cb(o->stream, success, o->userdata);
2578     }
2579
2580 finish:
2581     pa_operation_done(o);
2582     pa_operation_unref(o);
2583 }
2584
2585
2586 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2587     pa_operation *o;
2588     pa_tagstruct *t;
2589     uint32_t tag;
2590     pa_buffer_attr copy;
2591
2592     pa_assert(s);
2593     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2594     pa_assert(attr);
2595
2596     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2597     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2598     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2599     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2600
2601     /* Ask for a timing update before we cork/uncork to get the best
2602      * accuracy for the transport latency suitable for the
2603      * check_smoother_status() call in the started callback */
2604     request_auto_timing_update(s, TRUE);
2605
2606     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2607
2608     t = pa_tagstruct_command(
2609             s->context,
2610             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2611             &tag);
2612     pa_tagstruct_putu32(t, s->channel);
2613
2614     copy = *attr;
2615     patch_buffer_attr(s, &copy, NULL);
2616     attr = &copy;
2617
2618     pa_tagstruct_putu32(t, attr->maxlength);
2619
2620     if (s->direction == PA_STREAM_PLAYBACK)
2621         pa_tagstruct_put(
2622                 t,
2623                 PA_TAG_U32, attr->tlength,
2624                 PA_TAG_U32, attr->prebuf,
2625                 PA_TAG_U32, attr->minreq,
2626                 PA_TAG_INVALID);
2627     else
2628         pa_tagstruct_putu32(t, attr->fragsize);
2629
2630     if (s->context->version >= 13)
2631         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2632
2633     if (s->context->version >= 14)
2634         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2635
2636     pa_pstream_send_tagstruct(s->context->pstream, t);
2637     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2638
2639     /* This might cause changes in the read/write indexex, hence let's
2640      * request a timing update */
2641     request_auto_timing_update(s, TRUE);
2642
2643     return o;
2644 }
2645
2646 uint32_t pa_stream_get_device_index(pa_stream *s) {
2647     pa_assert(s);
2648     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2649
2650     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2651     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2652     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2653     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2654     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2655
2656     return s->device_index;
2657 }
2658
2659 const char *pa_stream_get_device_name(pa_stream *s) {
2660     pa_assert(s);
2661     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2662
2663     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2664     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2665     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2666     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2667     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2668
2669     return s->device_name;
2670 }
2671
2672 int pa_stream_is_suspended(pa_stream *s) {
2673     pa_assert(s);
2674     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2675
2676     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2677     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2678     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2679     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2680
2681     return s->suspended;
2682 }
2683
2684 int pa_stream_is_corked(pa_stream *s) {
2685     pa_assert(s);
2686     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2687
2688     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2689     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2690     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2691
2692     return s->corked;
2693 }
2694
2695 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2696     pa_operation *o = userdata;
2697     int success = 1;
2698
2699     pa_assert(pd);
2700     pa_assert(o);
2701     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2702
2703     if (!o->context)
2704         goto finish;
2705
2706     if (command != PA_COMMAND_REPLY) {
2707         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2708             goto finish;
2709
2710         success = 0;
2711     } else {
2712
2713         if (!pa_tagstruct_eof(t)) {
2714             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2715             goto finish;
2716         }
2717     }
2718
2719     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2720     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2721
2722     if (o->callback) {
2723         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2724         cb(o->stream, success, o->userdata);
2725     }
2726
2727 finish:
2728     pa_operation_done(o);
2729     pa_operation_unref(o);
2730 }
2731
2732
2733 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2734     pa_operation *o;
2735     pa_tagstruct *t;
2736     uint32_t tag;
2737
2738     pa_assert(s);
2739     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2740
2741     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2742     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2743     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2744     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2745     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2746     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2747
2748     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2749     o->private = PA_UINT_TO_PTR(rate);
2750
2751     t = pa_tagstruct_command(
2752             s->context,
2753             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2754             &tag);
2755     pa_tagstruct_putu32(t, s->channel);
2756     pa_tagstruct_putu32(t, rate);
2757
2758     pa_pstream_send_tagstruct(s->context->pstream, t);
2759     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2760
2761     return o;
2762 }
2763
2764 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2765     pa_operation *o;
2766     pa_tagstruct *t;
2767     uint32_t tag;
2768
2769     pa_assert(s);
2770     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2771
2772     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2773     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2774     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2775     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2776     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2777
2778     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2779
2780     t = pa_tagstruct_command(
2781             s->context,
2782             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2783             &tag);
2784     pa_tagstruct_putu32(t, s->channel);
2785     pa_tagstruct_putu32(t, (uint32_t) mode);
2786     pa_tagstruct_put_proplist(t, p);
2787
2788     pa_pstream_send_tagstruct(s->context->pstream, t);
2789     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2790
2791     /* Please note that we don't update s->proplist here, because we
2792      * don't export that field */
2793
2794     return o;
2795 }
2796
2797 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2798     pa_operation *o;
2799     pa_tagstruct *t;
2800     uint32_t tag;
2801     const char * const*k;
2802
2803     pa_assert(s);
2804     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2805
2806     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2807     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2808     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2809     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2810     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2811
2812     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2813
2814     t = pa_tagstruct_command(
2815             s->context,
2816             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2817             &tag);
2818     pa_tagstruct_putu32(t, s->channel);
2819
2820     for (k = keys; *k; k++)
2821         pa_tagstruct_puts(t, *k);
2822
2823     pa_tagstruct_puts(t, NULL);
2824
2825     pa_pstream_send_tagstruct(s->context->pstream, t);
2826     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2827
2828     /* Please note that we don't update s->proplist here, because we
2829      * don't export that field */
2830
2831     return o;
2832 }
2833
2834 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2835     pa_assert(s);
2836     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2837
2838     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2839     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2840     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2841     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2842
2843     s->direct_on_input = sink_input_idx;
2844
2845     return 0;
2846 }
2847
2848 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2849     pa_assert(s);
2850     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2851
2852     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2853     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2854     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2855
2856     return s->direct_on_input;
2857 }