core: Fix some FIXMEs for the extended API
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_format_info * const *formats,
89         pa_proplist *p) {
90
91     pa_stream *s;
92     int i;
93
94     pa_assert(c);
95     pa_assert(PA_REFCNT_VALUE(c) >= 1);
96     pa_assert((ss == NULL && map == NULL) || formats == NULL);
97
98     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
100
101     s = pa_xnew(pa_stream, 1);
102     PA_REFCNT_INIT(s);
103     s->context = c;
104     s->mainloop = c->mainloop;
105
106     s->direction = PA_STREAM_NODIRECTION;
107     s->state = PA_STREAM_UNCONNECTED;
108     s->flags = 0;
109
110     if (ss)
111         s->sample_spec = *ss;
112     else
113         s->sample_spec.format = PA_SAMPLE_INVALID;
114
115     if (map)
116         s->channel_map = *map;
117     else
118         pa_channel_map_init(&s->channel_map);
119
120     s->n_formats = 0;
121     if (formats) {
122         for (i = 0; formats[i] && i < PA_MAX_FORMATS; i++) {
123             s->n_formats++;
124             s->req_formats[i] = pa_format_info_copy(formats[i]);
125         }
126         /* Make sure the input array was NULL-terminated */
127         pa_assert(formats[i] == NULL);
128     }
129
130     /* We'll get the final negotiated format after connecting */
131     s->format = NULL;
132
133     s->direct_on_input = PA_INVALID_INDEX;
134
135     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136     if (name)
137         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
138
139     s->channel = 0;
140     s->channel_valid = FALSE;
141     s->syncid = c->csyncid++;
142     s->stream_index = PA_INVALID_INDEX;
143
144     s->requested_bytes = 0;
145     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146
147     /* We initialize der target length here, so that if the user
148      * passes no explicit buffering metrics the default is similar to
149      * what older PA versions provided. */
150
151     s->buffer_attr.maxlength = (uint32_t) -1;
152     if (ss)
153         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154     else {
155         /* FIXME: We assume a worst-case compressed format corresponding to
156          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
157         pa_sample_spec tmp_ss = {
158             .format   = PA_SAMPLE_S16NE,
159             .rate     = 48000,
160             .channels = 2,
161         };
162         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163     }
164     s->buffer_attr.minreq = (uint32_t) -1;
165     s->buffer_attr.prebuf = (uint32_t) -1;
166     s->buffer_attr.fragsize = (uint32_t) -1;
167
168     s->device_index = PA_INVALID_INDEX;
169     s->device_name = NULL;
170     s->suspended = FALSE;
171     s->corked = FALSE;
172
173     s->write_memblock = NULL;
174     s->write_data = NULL;
175
176     pa_memchunk_reset(&s->peek_memchunk);
177     s->peek_data = NULL;
178     s->record_memblockq = NULL;
179
180     memset(&s->timing_info, 0, sizeof(s->timing_info));
181     s->timing_info_valid = FALSE;
182
183     s->previous_time = 0;
184
185     s->read_index_not_before = 0;
186     s->write_index_not_before = 0;
187     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188         s->write_index_corrections[i].valid = 0;
189     s->current_write_index_correction = 0;
190
191     s->auto_timing_update_event = NULL;
192     s->auto_timing_update_requested = FALSE;
193     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195     reset_callbacks(s);
196
197     s->smoother = NULL;
198
199     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200     PA_LLIST_PREPEND(pa_stream, c->streams, s);
201     pa_stream_ref(s);
202
203     return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207         pa_context *c,
208         const char *name,
209         const pa_sample_spec *ss,
210         const pa_channel_map *map,
211         pa_proplist *p) {
212
213     pa_channel_map tmap;
214
215     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221     if (!map)
222         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228         pa_context *c,
229         const char *name,
230         pa_format_info * const *formats,
231         pa_proplist *p) {
232
233     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
234
235     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, p);
236 }
237
238 static void stream_unlink(pa_stream *s) {
239     pa_operation *o, *n;
240     pa_assert(s);
241
242     if (!s->context)
243         return;
244
245     /* Detach from context */
246
247     /* Unref all operatio object that point to us */
248     for (o = s->context->operations; o; o = n) {
249         n = o->next;
250
251         if (o->stream == s)
252             pa_operation_cancel(o);
253     }
254
255     /* Drop all outstanding replies for this stream */
256     if (s->context->pdispatch)
257         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
258
259     if (s->channel_valid) {
260         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
261         s->channel = 0;
262         s->channel_valid = FALSE;
263     }
264
265     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
266     pa_stream_unref(s);
267
268     s->context = NULL;
269
270     if (s->auto_timing_update_event) {
271         pa_assert(s->mainloop);
272         s->mainloop->time_free(s->auto_timing_update_event);
273     }
274
275     reset_callbacks(s);
276 }
277
278 static void stream_free(pa_stream *s) {
279     unsigned int i;
280
281     pa_assert(s);
282
283     stream_unlink(s);
284
285     if (s->write_memblock) {
286         pa_memblock_release(s->write_memblock);
287         pa_memblock_unref(s->write_data);
288     }
289
290     if (s->peek_memchunk.memblock) {
291         if (s->peek_data)
292             pa_memblock_release(s->peek_memchunk.memblock);
293         pa_memblock_unref(s->peek_memchunk.memblock);
294     }
295
296     if (s->record_memblockq)
297         pa_memblockq_free(s->record_memblockq);
298
299     if (s->proplist)
300         pa_proplist_free(s->proplist);
301
302     if (s->smoother)
303         pa_smoother_free(s->smoother);
304
305     for (i = 0; i < s->n_formats; i++)
306         pa_xfree(s->req_formats[i]);
307
308     pa_xfree(s->device_name);
309     pa_xfree(s);
310 }
311
312 void pa_stream_unref(pa_stream *s) {
313     pa_assert(s);
314     pa_assert(PA_REFCNT_VALUE(s) >= 1);
315
316     if (PA_REFCNT_DEC(s) <= 0)
317         stream_free(s);
318 }
319
320 pa_stream* pa_stream_ref(pa_stream *s) {
321     pa_assert(s);
322     pa_assert(PA_REFCNT_VALUE(s) >= 1);
323
324     PA_REFCNT_INC(s);
325     return s;
326 }
327
328 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
329     pa_assert(s);
330     pa_assert(PA_REFCNT_VALUE(s) >= 1);
331
332     return s->state;
333 }
334
335 pa_context* pa_stream_get_context(pa_stream *s) {
336     pa_assert(s);
337     pa_assert(PA_REFCNT_VALUE(s) >= 1);
338
339     return s->context;
340 }
341
342 uint32_t pa_stream_get_index(pa_stream *s) {
343     pa_assert(s);
344     pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
347     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
348
349     return s->stream_index;
350 }
351
352 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
353     pa_assert(s);
354     pa_assert(PA_REFCNT_VALUE(s) >= 1);
355
356     if (s->state == st)
357         return;
358
359     pa_stream_ref(s);
360
361     s->state = st;
362
363     if (s->state_callback)
364         s->state_callback(s, s->state_userdata);
365
366     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
367         stream_unlink(s);
368
369     pa_stream_unref(s);
370 }
371
372 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
373     pa_assert(s);
374     pa_assert(PA_REFCNT_VALUE(s) >= 1);
375
376     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
377         return;
378
379     if (s->state == PA_STREAM_READY &&
380         (force || !s->auto_timing_update_requested)) {
381         pa_operation *o;
382
383 /*         pa_log("Automatically requesting new timing data"); */
384
385         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
386             pa_operation_unref(o);
387             s->auto_timing_update_requested = TRUE;
388         }
389     }
390
391     if (s->auto_timing_update_event) {
392         if (force)
393             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
394
395         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
396
397         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
398     }
399 }
400
401 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
402     pa_context *c = userdata;
403     pa_stream *s;
404     uint32_t channel;
405
406     pa_assert(pd);
407     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
408     pa_assert(t);
409     pa_assert(c);
410     pa_assert(PA_REFCNT_VALUE(c) >= 1);
411
412     pa_context_ref(c);
413
414     if (pa_tagstruct_getu32(t, &channel) < 0 ||
415         !pa_tagstruct_eof(t)) {
416         pa_context_fail(c, PA_ERR_PROTOCOL);
417         goto finish;
418     }
419
420     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
421         goto finish;
422
423     if (s->state != PA_STREAM_READY)
424         goto finish;
425
426     pa_context_set_error(c, PA_ERR_KILLED);
427     pa_stream_set_state(s, PA_STREAM_FAILED);
428
429 finish:
430     pa_context_unref(c);
431 }
432
433 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
434     pa_usec_t x;
435
436     pa_assert(s);
437     pa_assert(!force_start || !force_stop);
438
439     if (!s->smoother)
440         return;
441
442     x = pa_rtclock_now();
443
444     if (s->timing_info_valid) {
445         if (aposteriori)
446             x -= s->timing_info.transport_usec;
447         else
448             x += s->timing_info.transport_usec;
449     }
450
451     if (s->suspended || s->corked || force_stop)
452         pa_smoother_pause(s->smoother, x);
453     else if (force_start || s->buffer_attr.prebuf == 0) {
454
455         if (!s->timing_info_valid &&
456             !aposteriori &&
457             !force_start &&
458             !force_stop &&
459             s->context->version >= 13) {
460
461             /* If the server supports STARTED events we take them as
462              * indications when audio really starts/stops playing, if
463              * we don't have any timing info yet -- instead of trying
464              * to be smart and guessing the server time. Otherwise the
465              * unknown transport delay add too much noise to our time
466              * calculations. */
467
468             return;
469         }
470
471         pa_smoother_resume(s->smoother, x, TRUE);
472     }
473
474     /* Please note that we have no idea if playback actually started
475      * if prebuf is non-zero! */
476 }
477
478 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
479     pa_context *c = userdata;
480     pa_stream *s;
481     uint32_t channel;
482     const char *dn;
483     pa_bool_t suspended;
484     uint32_t di;
485     pa_usec_t usec = 0;
486     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
487
488     pa_assert(pd);
489     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
490     pa_assert(t);
491     pa_assert(c);
492     pa_assert(PA_REFCNT_VALUE(c) >= 1);
493
494     pa_context_ref(c);
495
496     if (c->version < 12) {
497         pa_context_fail(c, PA_ERR_PROTOCOL);
498         goto finish;
499     }
500
501     if (pa_tagstruct_getu32(t, &channel) < 0 ||
502         pa_tagstruct_getu32(t, &di) < 0 ||
503         pa_tagstruct_gets(t, &dn) < 0 ||
504         pa_tagstruct_get_boolean(t, &suspended) < 0) {
505         pa_context_fail(c, PA_ERR_PROTOCOL);
506         goto finish;
507     }
508
509     if (c->version >= 13) {
510
511         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
512             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
513                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
514                 pa_tagstruct_get_usec(t, &usec) < 0) {
515                 pa_context_fail(c, PA_ERR_PROTOCOL);
516                 goto finish;
517             }
518         } else {
519             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
520                 pa_tagstruct_getu32(t, &tlength) < 0 ||
521                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
522                 pa_tagstruct_getu32(t, &minreq) < 0 ||
523                 pa_tagstruct_get_usec(t, &usec) < 0) {
524                 pa_context_fail(c, PA_ERR_PROTOCOL);
525                 goto finish;
526             }
527         }
528     }
529
530     if (!pa_tagstruct_eof(t)) {
531         pa_context_fail(c, PA_ERR_PROTOCOL);
532         goto finish;
533     }
534
535     if (!dn || di == PA_INVALID_INDEX) {
536         pa_context_fail(c, PA_ERR_PROTOCOL);
537         goto finish;
538     }
539
540     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
541         goto finish;
542
543     if (s->state != PA_STREAM_READY)
544         goto finish;
545
546     if (c->version >= 13) {
547         if (s->direction == PA_STREAM_RECORD)
548             s->timing_info.configured_source_usec = usec;
549         else
550             s->timing_info.configured_sink_usec = usec;
551
552         s->buffer_attr.maxlength = maxlength;
553         s->buffer_attr.fragsize = fragsize;
554         s->buffer_attr.tlength = tlength;
555         s->buffer_attr.prebuf = prebuf;
556         s->buffer_attr.minreq = minreq;
557     }
558
559     pa_xfree(s->device_name);
560     s->device_name = pa_xstrdup(dn);
561     s->device_index = di;
562
563     s->suspended = suspended;
564
565     check_smoother_status(s, TRUE, FALSE, FALSE);
566     request_auto_timing_update(s, TRUE);
567
568     if (s->moved_callback)
569         s->moved_callback(s, s->moved_userdata);
570
571 finish:
572     pa_context_unref(c);
573 }
574
575 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
576     pa_context *c = userdata;
577     pa_stream *s;
578     uint32_t channel;
579     pa_usec_t usec = 0;
580     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
581
582     pa_assert(pd);
583     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
584     pa_assert(t);
585     pa_assert(c);
586     pa_assert(PA_REFCNT_VALUE(c) >= 1);
587
588     pa_context_ref(c);
589
590     if (c->version < 15) {
591         pa_context_fail(c, PA_ERR_PROTOCOL);
592         goto finish;
593     }
594
595     if (pa_tagstruct_getu32(t, &channel) < 0) {
596         pa_context_fail(c, PA_ERR_PROTOCOL);
597         goto finish;
598     }
599
600     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
601         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
602             pa_tagstruct_getu32(t, &fragsize) < 0 ||
603             pa_tagstruct_get_usec(t, &usec) < 0) {
604             pa_context_fail(c, PA_ERR_PROTOCOL);
605             goto finish;
606         }
607     } else {
608         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
609             pa_tagstruct_getu32(t, &tlength) < 0 ||
610             pa_tagstruct_getu32(t, &prebuf) < 0 ||
611             pa_tagstruct_getu32(t, &minreq) < 0 ||
612             pa_tagstruct_get_usec(t, &usec) < 0) {
613             pa_context_fail(c, PA_ERR_PROTOCOL);
614             goto finish;
615         }
616     }
617
618     if (!pa_tagstruct_eof(t)) {
619         pa_context_fail(c, PA_ERR_PROTOCOL);
620         goto finish;
621     }
622
623     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
624         goto finish;
625
626     if (s->state != PA_STREAM_READY)
627         goto finish;
628
629     if (s->direction == PA_STREAM_RECORD)
630         s->timing_info.configured_source_usec = usec;
631     else
632         s->timing_info.configured_sink_usec = usec;
633
634     s->buffer_attr.maxlength = maxlength;
635     s->buffer_attr.fragsize = fragsize;
636     s->buffer_attr.tlength = tlength;
637     s->buffer_attr.prebuf = prebuf;
638     s->buffer_attr.minreq = minreq;
639
640     request_auto_timing_update(s, TRUE);
641
642     if (s->buffer_attr_callback)
643         s->buffer_attr_callback(s, s->buffer_attr_userdata);
644
645 finish:
646     pa_context_unref(c);
647 }
648
649 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
650     pa_context *c = userdata;
651     pa_stream *s;
652     uint32_t channel;
653     pa_bool_t suspended;
654
655     pa_assert(pd);
656     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
657     pa_assert(t);
658     pa_assert(c);
659     pa_assert(PA_REFCNT_VALUE(c) >= 1);
660
661     pa_context_ref(c);
662
663     if (c->version < 12) {
664         pa_context_fail(c, PA_ERR_PROTOCOL);
665         goto finish;
666     }
667
668     if (pa_tagstruct_getu32(t, &channel) < 0 ||
669         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
670         !pa_tagstruct_eof(t)) {
671         pa_context_fail(c, PA_ERR_PROTOCOL);
672         goto finish;
673     }
674
675     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
676         goto finish;
677
678     if (s->state != PA_STREAM_READY)
679         goto finish;
680
681     s->suspended = suspended;
682
683     check_smoother_status(s, TRUE, FALSE, FALSE);
684     request_auto_timing_update(s, TRUE);
685
686     if (s->suspended_callback)
687         s->suspended_callback(s, s->suspended_userdata);
688
689 finish:
690     pa_context_unref(c);
691 }
692
693 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
694     pa_context *c = userdata;
695     pa_stream *s;
696     uint32_t channel;
697
698     pa_assert(pd);
699     pa_assert(command == PA_COMMAND_STARTED);
700     pa_assert(t);
701     pa_assert(c);
702     pa_assert(PA_REFCNT_VALUE(c) >= 1);
703
704     pa_context_ref(c);
705
706     if (c->version < 13) {
707         pa_context_fail(c, PA_ERR_PROTOCOL);
708         goto finish;
709     }
710
711     if (pa_tagstruct_getu32(t, &channel) < 0 ||
712         !pa_tagstruct_eof(t)) {
713         pa_context_fail(c, PA_ERR_PROTOCOL);
714         goto finish;
715     }
716
717     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
718         goto finish;
719
720     if (s->state != PA_STREAM_READY)
721         goto finish;
722
723     check_smoother_status(s, TRUE, TRUE, FALSE);
724     request_auto_timing_update(s, TRUE);
725
726     if (s->started_callback)
727         s->started_callback(s, s->started_userdata);
728
729 finish:
730     pa_context_unref(c);
731 }
732
733 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
734     pa_context *c = userdata;
735     pa_stream *s;
736     uint32_t channel;
737     pa_proplist *pl = NULL;
738     const char *event;
739
740     pa_assert(pd);
741     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
742     pa_assert(t);
743     pa_assert(c);
744     pa_assert(PA_REFCNT_VALUE(c) >= 1);
745
746     pa_context_ref(c);
747
748     if (c->version < 15) {
749         pa_context_fail(c, PA_ERR_PROTOCOL);
750         goto finish;
751     }
752
753     pl = pa_proplist_new();
754
755     if (pa_tagstruct_getu32(t, &channel) < 0 ||
756         pa_tagstruct_gets(t, &event) < 0 ||
757         pa_tagstruct_get_proplist(t, pl) < 0 ||
758         !pa_tagstruct_eof(t) || !event) {
759         pa_context_fail(c, PA_ERR_PROTOCOL);
760         goto finish;
761     }
762
763     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
764         goto finish;
765
766     if (s->state != PA_STREAM_READY)
767         goto finish;
768
769     if (s->event_callback)
770         s->event_callback(s, event, pl, s->event_userdata);
771
772 finish:
773     pa_context_unref(c);
774
775     if (pl)
776         pa_proplist_free(pl);
777 }
778
779 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
780     pa_stream *s;
781     pa_context *c = userdata;
782     uint32_t bytes, channel;
783
784     pa_assert(pd);
785     pa_assert(command == PA_COMMAND_REQUEST);
786     pa_assert(t);
787     pa_assert(c);
788     pa_assert(PA_REFCNT_VALUE(c) >= 1);
789
790     pa_context_ref(c);
791
792     if (pa_tagstruct_getu32(t, &channel) < 0 ||
793         pa_tagstruct_getu32(t, &bytes) < 0 ||
794         !pa_tagstruct_eof(t)) {
795         pa_context_fail(c, PA_ERR_PROTOCOL);
796         goto finish;
797     }
798
799     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
800         goto finish;
801
802     if (s->state != PA_STREAM_READY)
803         goto finish;
804
805     s->requested_bytes += bytes;
806
807     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
808
809     if (s->requested_bytes > 0 && s->write_callback)
810         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
811
812 finish:
813     pa_context_unref(c);
814 }
815
816 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
817     pa_stream *s;
818     pa_context *c = userdata;
819     uint32_t channel;
820
821     pa_assert(pd);
822     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
823     pa_assert(t);
824     pa_assert(c);
825     pa_assert(PA_REFCNT_VALUE(c) >= 1);
826
827     pa_context_ref(c);
828
829     if (pa_tagstruct_getu32(t, &channel) < 0 ||
830         !pa_tagstruct_eof(t)) {
831         pa_context_fail(c, PA_ERR_PROTOCOL);
832         goto finish;
833     }
834
835     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
836         goto finish;
837
838     if (s->state != PA_STREAM_READY)
839         goto finish;
840
841     if (s->buffer_attr.prebuf > 0)
842         check_smoother_status(s, TRUE, FALSE, TRUE);
843
844     request_auto_timing_update(s, TRUE);
845
846     if (command == PA_COMMAND_OVERFLOW) {
847         if (s->overflow_callback)
848             s->overflow_callback(s, s->overflow_userdata);
849     } else if (command == PA_COMMAND_UNDERFLOW) {
850         if (s->underflow_callback)
851             s->underflow_callback(s, s->underflow_userdata);
852     }
853
854 finish:
855     pa_context_unref(c);
856 }
857
858 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
859     pa_assert(s);
860     pa_assert(PA_REFCNT_VALUE(s) >= 1);
861
862 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
863
864     if (s->state != PA_STREAM_READY)
865         return;
866
867     if (w) {
868         s->write_index_not_before = s->context->ctag;
869
870         if (s->timing_info_valid)
871             s->timing_info.write_index_corrupt = TRUE;
872
873 /*         pa_log("write_index invalidated"); */
874     }
875
876     if (r) {
877         s->read_index_not_before = s->context->ctag;
878
879         if (s->timing_info_valid)
880             s->timing_info.read_index_corrupt = TRUE;
881
882 /*         pa_log("read_index invalidated"); */
883     }
884
885     request_auto_timing_update(s, TRUE);
886 }
887
888 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
889     pa_stream *s = userdata;
890
891     pa_assert(s);
892     pa_assert(PA_REFCNT_VALUE(s) >= 1);
893
894     pa_stream_ref(s);
895     request_auto_timing_update(s, FALSE);
896     pa_stream_unref(s);
897 }
898
899 static void create_stream_complete(pa_stream *s) {
900     pa_assert(s);
901     pa_assert(PA_REFCNT_VALUE(s) >= 1);
902     pa_assert(s->state == PA_STREAM_CREATING);
903
904     pa_stream_set_state(s, PA_STREAM_READY);
905
906     if (s->requested_bytes > 0 && s->write_callback)
907         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
908
909     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
910         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
911         pa_assert(!s->auto_timing_update_event);
912         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
913
914         request_auto_timing_update(s, TRUE);
915     }
916
917     check_smoother_status(s, TRUE, FALSE, FALSE);
918 }
919
920 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
921     const char *e;
922
923     pa_assert(s);
924     pa_assert(attr);
925
926     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
927         uint32_t ms;
928
929         if (pa_atou(e, &ms) < 0 || ms <= 0)
930             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
931         else {
932             attr->maxlength = (uint32_t) -1;
933             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
934             attr->minreq = (uint32_t) -1;
935             attr->prebuf = (uint32_t) -1;
936             attr->fragsize = attr->tlength;
937         }
938
939         if (flags)
940             *flags |= PA_STREAM_ADJUST_LATENCY;
941     }
942
943     if (s->context->version >= 13)
944         return;
945
946     /* Version older than 0.9.10 didn't do server side buffer_attr
947      * selection, hence we have to fake it on the client side. */
948
949     /* We choose fairly conservative values here, to not confuse
950      * old clients with extremely large playback buffers */
951
952     if (attr->maxlength == (uint32_t) -1)
953         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
954
955     if (attr->tlength == (uint32_t) -1)
956         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
957
958     if (attr->minreq == (uint32_t) -1)
959         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
960
961     if (attr->prebuf == (uint32_t) -1)
962         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
963
964     if (attr->fragsize == (uint32_t) -1)
965         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
966 }
967
968 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
969     pa_stream *s = userdata;
970     uint32_t requested_bytes = 0;
971
972     pa_assert(pd);
973     pa_assert(s);
974     pa_assert(PA_REFCNT_VALUE(s) >= 1);
975     pa_assert(s->state == PA_STREAM_CREATING);
976
977     pa_stream_ref(s);
978
979     if (command != PA_COMMAND_REPLY) {
980         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
981             goto finish;
982
983         pa_stream_set_state(s, PA_STREAM_FAILED);
984         goto finish;
985     }
986
987     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
988         s->channel == PA_INVALID_INDEX ||
989         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
990         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
991         pa_context_fail(s->context, PA_ERR_PROTOCOL);
992         goto finish;
993     }
994
995     s->requested_bytes = (int64_t) requested_bytes;
996
997     if (s->context->version >= 9) {
998         if (s->direction == PA_STREAM_PLAYBACK) {
999             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1000                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1001                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1002                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1003                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1004                 goto finish;
1005             }
1006         } else if (s->direction == PA_STREAM_RECORD) {
1007             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1008                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1009                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1010                 goto finish;
1011             }
1012         }
1013     }
1014
1015     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1016         pa_sample_spec ss;
1017         pa_channel_map cm;
1018         const char *dn = NULL;
1019         pa_bool_t suspended;
1020
1021         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1022             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1023             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1024             pa_tagstruct_gets(t, &dn) < 0 ||
1025             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1026             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1027             goto finish;
1028         }
1029
1030         if (!dn || s->device_index == PA_INVALID_INDEX ||
1031             ss.channels != cm.channels ||
1032             !pa_channel_map_valid(&cm) ||
1033             !pa_sample_spec_valid(&ss) ||
1034             (s->n_formats == 0 && (
1035                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1036                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1037                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1038             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1039             goto finish;
1040         }
1041
1042         pa_xfree(s->device_name);
1043         s->device_name = pa_xstrdup(dn);
1044         s->suspended = suspended;
1045
1046         s->channel_map = cm;
1047         s->sample_spec = ss;
1048     }
1049
1050     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1051         pa_usec_t usec;
1052
1053         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1054             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1055             goto finish;
1056         }
1057
1058         if (s->direction == PA_STREAM_RECORD)
1059             s->timing_info.configured_source_usec = usec;
1060         else
1061             s->timing_info.configured_sink_usec = usec;
1062     }
1063
1064     if (s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) {
1065         pa_format_info *f = pa_format_info_new();
1066         pa_tagstruct_get_format_info(t, f);
1067
1068         if (pa_format_info_valid(f))
1069             s->format = f;
1070         else {
1071             pa_format_info_free(f);
1072             if (s->n_formats > 0) {
1073                 /* We used the extended API, so we should have got back a proper format */
1074                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1075                 goto finish;
1076             }
1077         }
1078     }
1079
1080     if (!pa_tagstruct_eof(t)) {
1081         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1082         goto finish;
1083     }
1084
1085     if (s->direction == PA_STREAM_RECORD) {
1086         pa_assert(!s->record_memblockq);
1087
1088         s->record_memblockq = pa_memblockq_new(
1089                 0,
1090                 s->buffer_attr.maxlength,
1091                 0,
1092                 pa_frame_size(&s->sample_spec),
1093                 1,
1094                 0,
1095                 0,
1096                 NULL);
1097     }
1098
1099     s->channel_valid = TRUE;
1100     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1101
1102     create_stream_complete(s);
1103
1104 finish:
1105     pa_stream_unref(s);
1106 }
1107
1108 static int create_stream(
1109         pa_stream_direction_t direction,
1110         pa_stream *s,
1111         const char *dev,
1112         const pa_buffer_attr *attr,
1113         pa_stream_flags_t flags,
1114         const pa_cvolume *volume,
1115         pa_stream *sync_stream) {
1116
1117     pa_tagstruct *t;
1118     uint32_t tag;
1119     pa_bool_t volume_set = FALSE;
1120     uint32_t i;
1121
1122     pa_assert(s);
1123     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1124     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1125
1126     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1127     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1128     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1129     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1130                                               PA_STREAM_INTERPOLATE_TIMING|
1131                                               PA_STREAM_NOT_MONOTONIC|
1132                                               PA_STREAM_AUTO_TIMING_UPDATE|
1133                                               PA_STREAM_NO_REMAP_CHANNELS|
1134                                               PA_STREAM_NO_REMIX_CHANNELS|
1135                                               PA_STREAM_FIX_FORMAT|
1136                                               PA_STREAM_FIX_RATE|
1137                                               PA_STREAM_FIX_CHANNELS|
1138                                               PA_STREAM_DONT_MOVE|
1139                                               PA_STREAM_VARIABLE_RATE|
1140                                               PA_STREAM_PEAK_DETECT|
1141                                               PA_STREAM_START_MUTED|
1142                                               PA_STREAM_ADJUST_LATENCY|
1143                                               PA_STREAM_EARLY_REQUESTS|
1144                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1145                                               PA_STREAM_START_UNMUTED|
1146                                               PA_STREAM_FAIL_ON_SUSPEND|
1147                                               PA_STREAM_RELATIVE_VOLUME|
1148                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1149
1150
1151     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1152     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1153     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1154     /* Althought some of the other flags are not supported on older
1155      * version, we don't check for them here, because it doesn't hurt
1156      * when they are passed but actually not supported. This makes
1157      * client development easier */
1158
1159     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1160     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1161     PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1162     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1163     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1164
1165     pa_stream_ref(s);
1166
1167     s->direction = direction;
1168
1169     if (sync_stream)
1170         s->syncid = sync_stream->syncid;
1171
1172     if (attr)
1173         s->buffer_attr = *attr;
1174     patch_buffer_attr(s, &s->buffer_attr, &flags);
1175
1176     s->flags = flags;
1177     s->corked = !!(flags & PA_STREAM_START_CORKED);
1178
1179     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1180         pa_usec_t x;
1181
1182         x = pa_rtclock_now();
1183
1184         pa_assert(!s->smoother);
1185         s->smoother = pa_smoother_new(
1186                 SMOOTHER_ADJUST_TIME,
1187                 SMOOTHER_HISTORY_TIME,
1188                 !(flags & PA_STREAM_NOT_MONOTONIC),
1189                 TRUE,
1190                 SMOOTHER_MIN_HISTORY,
1191                 x,
1192                 TRUE);
1193     }
1194
1195     if (!dev)
1196         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1197
1198     t = pa_tagstruct_command(
1199             s->context,
1200             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1201             &tag);
1202
1203     if (s->context->version < 13)
1204         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1205
1206     pa_tagstruct_put(
1207             t,
1208             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1209             PA_TAG_CHANNEL_MAP, &s->channel_map,
1210             PA_TAG_U32, PA_INVALID_INDEX,
1211             PA_TAG_STRING, dev,
1212             PA_TAG_U32, s->buffer_attr.maxlength,
1213             PA_TAG_BOOLEAN, s->corked,
1214             PA_TAG_INVALID);
1215
1216     if (s->direction == PA_STREAM_PLAYBACK) {
1217         pa_cvolume cv;
1218
1219         pa_tagstruct_put(
1220                 t,
1221                 PA_TAG_U32, s->buffer_attr.tlength,
1222                 PA_TAG_U32, s->buffer_attr.prebuf,
1223                 PA_TAG_U32, s->buffer_attr.minreq,
1224                 PA_TAG_U32, s->syncid,
1225                 PA_TAG_INVALID);
1226
1227         volume_set = !!volume;
1228
1229         if (!volume) {
1230             if (pa_sample_spec_valid(&s->sample_spec))
1231                 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1232             else {
1233                 /* This is not really relevant, since no volume was set, and
1234                  * the real number of channels is embedded in the format_info
1235                  * structure */
1236                 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1237             }
1238         }
1239
1240         pa_tagstruct_put_cvolume(t, volume);
1241     } else
1242         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1243
1244     if (s->context->version >= 12) {
1245         pa_tagstruct_put(
1246                 t,
1247                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1248                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1249                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1250                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1251                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1252                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1253                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1254                 PA_TAG_INVALID);
1255     }
1256
1257     if (s->context->version >= 13) {
1258
1259         if (s->direction == PA_STREAM_PLAYBACK)
1260             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1261         else
1262             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1263
1264         pa_tagstruct_put(
1265                 t,
1266                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1267                 PA_TAG_PROPLIST, s->proplist,
1268                 PA_TAG_INVALID);
1269
1270         if (s->direction == PA_STREAM_RECORD)
1271             pa_tagstruct_putu32(t, s->direct_on_input);
1272     }
1273
1274     if (s->context->version >= 14) {
1275
1276         if (s->direction == PA_STREAM_PLAYBACK)
1277             pa_tagstruct_put_boolean(t, volume_set);
1278
1279         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1280     }
1281
1282     if (s->context->version >= 15) {
1283
1284         if (s->direction == PA_STREAM_PLAYBACK)
1285             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1286
1287         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1288         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1289     }
1290
1291     if (s->context->version >= 17) {
1292
1293         if (s->direction == PA_STREAM_PLAYBACK)
1294             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1295
1296     }
1297
1298     if (s->context->version >= 18) {
1299
1300         if (s->direction == PA_STREAM_PLAYBACK)
1301             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1302     }
1303
1304     if (s->context->version >= 21) {
1305
1306         if (s->direction == PA_STREAM_PLAYBACK) {
1307             pa_tagstruct_putu8(t, s->n_formats);
1308             for (i = 0; i < s->n_formats; i++)
1309                 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1310         }
1311     }
1312
1313     pa_pstream_send_tagstruct(s->context->pstream, t);
1314     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1315
1316     pa_stream_set_state(s, PA_STREAM_CREATING);
1317
1318     pa_stream_unref(s);
1319     return 0;
1320 }
1321
1322 int pa_stream_connect_playback(
1323         pa_stream *s,
1324         const char *dev,
1325         const pa_buffer_attr *attr,
1326         pa_stream_flags_t flags,
1327         const pa_cvolume *volume,
1328         pa_stream *sync_stream) {
1329
1330     pa_assert(s);
1331     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1332
1333     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1334 }
1335
1336 int pa_stream_connect_record(
1337         pa_stream *s,
1338         const char *dev,
1339         const pa_buffer_attr *attr,
1340         pa_stream_flags_t flags) {
1341
1342     pa_assert(s);
1343     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1344
1345     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1346 }
1347
1348 int pa_stream_begin_write(
1349         pa_stream *s,
1350         void **data,
1351         size_t *nbytes) {
1352
1353     pa_assert(s);
1354     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1355
1356     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1357     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1358     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1359     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1360     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1361
1362     if (*nbytes != (size_t) -1) {
1363         size_t m, fs;
1364
1365         m = pa_mempool_block_size_max(s->context->mempool);
1366         fs = pa_frame_size(&s->sample_spec);
1367
1368         m = (m / fs) * fs;
1369         if (*nbytes > m)
1370             *nbytes = m;
1371     }
1372
1373     if (!s->write_memblock) {
1374         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1375         s->write_data = pa_memblock_acquire(s->write_memblock);
1376     }
1377
1378     *data = s->write_data;
1379     *nbytes = pa_memblock_get_length(s->write_memblock);
1380
1381     return 0;
1382 }
1383
1384 int pa_stream_cancel_write(
1385         pa_stream *s) {
1386
1387     pa_assert(s);
1388     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1389
1390     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1391     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1392     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1393     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1394
1395     pa_assert(s->write_data);
1396
1397     pa_memblock_release(s->write_memblock);
1398     pa_memblock_unref(s->write_memblock);
1399     s->write_memblock = NULL;
1400     s->write_data = NULL;
1401
1402     return 0;
1403 }
1404
1405 int pa_stream_write(
1406         pa_stream *s,
1407         const void *data,
1408         size_t length,
1409         pa_free_cb_t free_cb,
1410         int64_t offset,
1411         pa_seek_mode_t seek) {
1412
1413     pa_assert(s);
1414     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1415     pa_assert(data);
1416
1417     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1418     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1419     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1420     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1421     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1422     PA_CHECK_VALIDITY(s->context,
1423                       !s->write_memblock ||
1424                       ((data >= s->write_data) &&
1425                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1426                       PA_ERR_INVALID);
1427     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1428
1429     if (s->write_memblock) {
1430         pa_memchunk chunk;
1431
1432         /* pa_stream_write_begin() was called before */
1433
1434         pa_memblock_release(s->write_memblock);
1435
1436         chunk.memblock = s->write_memblock;
1437         chunk.index = (const char *) data - (const char *) s->write_data;
1438         chunk.length = length;
1439
1440         s->write_memblock = NULL;
1441         s->write_data = NULL;
1442
1443         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1444         pa_memblock_unref(chunk.memblock);
1445
1446     } else {
1447         pa_seek_mode_t t_seek = seek;
1448         int64_t t_offset = offset;
1449         size_t t_length = length;
1450         const void *t_data = data;
1451
1452         /* pa_stream_write_begin() was not called before */
1453
1454         while (t_length > 0) {
1455             pa_memchunk chunk;
1456
1457             chunk.index = 0;
1458
1459             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1460                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1461                 chunk.length = t_length;
1462             } else {
1463                 void *d;
1464
1465                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1466                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1467
1468                 d = pa_memblock_acquire(chunk.memblock);
1469                 memcpy(d, t_data, chunk.length);
1470                 pa_memblock_release(chunk.memblock);
1471             }
1472
1473             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1474
1475             t_offset = 0;
1476             t_seek = PA_SEEK_RELATIVE;
1477
1478             t_data = (const uint8_t*) t_data + chunk.length;
1479             t_length -= chunk.length;
1480
1481             pa_memblock_unref(chunk.memblock);
1482         }
1483
1484         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1485             free_cb((void*) data);
1486     }
1487
1488     /* This is obviously wrong since we ignore the seeking index . But
1489      * that's OK, the server side applies the same error */
1490     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1491
1492     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1493
1494     if (s->direction == PA_STREAM_PLAYBACK) {
1495
1496         /* Update latency request correction */
1497         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1498
1499             if (seek == PA_SEEK_ABSOLUTE) {
1500                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1501                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1502                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1503             } else if (seek == PA_SEEK_RELATIVE) {
1504                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1505                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1506             } else
1507                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1508         }
1509
1510         /* Update the write index in the already available latency data */
1511         if (s->timing_info_valid) {
1512
1513             if (seek == PA_SEEK_ABSOLUTE) {
1514                 s->timing_info.write_index_corrupt = FALSE;
1515                 s->timing_info.write_index = offset + (int64_t) length;
1516             } else if (seek == PA_SEEK_RELATIVE) {
1517                 if (!s->timing_info.write_index_corrupt)
1518                     s->timing_info.write_index += offset + (int64_t) length;
1519             } else
1520                 s->timing_info.write_index_corrupt = TRUE;
1521         }
1522
1523         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1524             request_auto_timing_update(s, TRUE);
1525     }
1526
1527     return 0;
1528 }
1529
1530 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1531     pa_assert(s);
1532     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1533     pa_assert(data);
1534     pa_assert(length);
1535
1536     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1537     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1538     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1539
1540     if (!s->peek_memchunk.memblock) {
1541
1542         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1543             *data = NULL;
1544             *length = 0;
1545             return 0;
1546         }
1547
1548         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1549     }
1550
1551     pa_assert(s->peek_data);
1552     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1553     *length = s->peek_memchunk.length;
1554     return 0;
1555 }
1556
1557 int pa_stream_drop(pa_stream *s) {
1558     pa_assert(s);
1559     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1560
1561     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1562     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1563     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1564     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1565
1566     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1567
1568     /* Fix the simulated local read index */
1569     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1570         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1571
1572     pa_assert(s->peek_data);
1573     pa_memblock_release(s->peek_memchunk.memblock);
1574     pa_memblock_unref(s->peek_memchunk.memblock);
1575     pa_memchunk_reset(&s->peek_memchunk);
1576
1577     return 0;
1578 }
1579
1580 size_t pa_stream_writable_size(pa_stream *s) {
1581     pa_assert(s);
1582     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1583
1584     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1585     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1586     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1587
1588     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1589 }
1590
1591 size_t pa_stream_readable_size(pa_stream *s) {
1592     pa_assert(s);
1593     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1594
1595     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1596     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1597     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1598
1599     return pa_memblockq_get_length(s->record_memblockq);
1600 }
1601
1602 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1603     pa_operation *o;
1604     pa_tagstruct *t;
1605     uint32_t tag;
1606
1607     pa_assert(s);
1608     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1609
1610     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1611     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1612     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1613
1614     /* Ask for a timing update before we cork/uncork to get the best
1615      * accuracy for the transport latency suitable for the
1616      * check_smoother_status() call in the started callback */
1617     request_auto_timing_update(s, TRUE);
1618
1619     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1620
1621     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1622     pa_tagstruct_putu32(t, s->channel);
1623     pa_pstream_send_tagstruct(s->context->pstream, t);
1624     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1625
1626     /* This might cause the read index to conitnue again, hence
1627      * let's request a timing update */
1628     request_auto_timing_update(s, TRUE);
1629
1630     return o;
1631 }
1632
1633 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1634     pa_usec_t usec;
1635
1636     pa_assert(s);
1637     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1638     pa_assert(s->state == PA_STREAM_READY);
1639     pa_assert(s->direction != PA_STREAM_UPLOAD);
1640     pa_assert(s->timing_info_valid);
1641     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1642     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1643
1644     if (s->direction == PA_STREAM_PLAYBACK) {
1645         /* The last byte that was written into the output device
1646          * had this time value associated */
1647         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1648
1649         if (!s->corked && !s->suspended) {
1650
1651             if (!ignore_transport)
1652                 /* Because the latency info took a little time to come
1653                  * to us, we assume that the real output time is actually
1654                  * a little ahead */
1655                 usec += s->timing_info.transport_usec;
1656
1657             /* However, the output device usually maintains a buffer
1658                too, hence the real sample currently played is a little
1659                back  */
1660             if (s->timing_info.sink_usec >= usec)
1661                 usec = 0;
1662             else
1663                 usec -= s->timing_info.sink_usec;
1664         }
1665
1666     } else {
1667         pa_assert(s->direction == PA_STREAM_RECORD);
1668
1669         /* The last byte written into the server side queue had
1670          * this time value associated */
1671         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1672
1673         if (!s->corked && !s->suspended) {
1674
1675             if (!ignore_transport)
1676                 /* Add transport latency */
1677                 usec += s->timing_info.transport_usec;
1678
1679             /* Add latency of data in device buffer */
1680             usec += s->timing_info.source_usec;
1681
1682             /* If this is a monitor source, we need to correct the
1683              * time by the playback device buffer */
1684             if (s->timing_info.sink_usec >= usec)
1685                 usec = 0;
1686             else
1687                 usec -= s->timing_info.sink_usec;
1688         }
1689     }
1690
1691     return usec;
1692 }
1693
1694 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1695     pa_operation *o = userdata;
1696     struct timeval local, remote, now;
1697     pa_timing_info *i;
1698     pa_bool_t playing = FALSE;
1699     uint64_t underrun_for = 0, playing_for = 0;
1700
1701     pa_assert(pd);
1702     pa_assert(o);
1703     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1704
1705     if (!o->context || !o->stream)
1706         goto finish;
1707
1708     i = &o->stream->timing_info;
1709
1710     o->stream->timing_info_valid = FALSE;
1711     i->write_index_corrupt = TRUE;
1712     i->read_index_corrupt = TRUE;
1713
1714     if (command != PA_COMMAND_REPLY) {
1715         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1716             goto finish;
1717
1718     } else {
1719
1720         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1721             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1722             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1723             pa_tagstruct_get_timeval(t, &local) < 0 ||
1724             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1725             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1726             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1727
1728             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1729             goto finish;
1730         }
1731
1732         if (o->context->version >= 13 &&
1733             o->stream->direction == PA_STREAM_PLAYBACK)
1734             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1735                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1736
1737                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1738                 goto finish;
1739             }
1740
1741
1742         if (!pa_tagstruct_eof(t)) {
1743             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1744             goto finish;
1745         }
1746         o->stream->timing_info_valid = TRUE;
1747         i->write_index_corrupt = FALSE;
1748         i->read_index_corrupt = FALSE;
1749
1750         i->playing = (int) playing;
1751         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1752
1753         pa_gettimeofday(&now);
1754
1755         /* Calculcate timestamps */
1756         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1757             /* local and remote seem to have synchronized clocks */
1758
1759             if (o->stream->direction == PA_STREAM_PLAYBACK)
1760                 i->transport_usec = pa_timeval_diff(&remote, &local);
1761             else
1762                 i->transport_usec = pa_timeval_diff(&now, &remote);
1763
1764             i->synchronized_clocks = TRUE;
1765             i->timestamp = remote;
1766         } else {
1767             /* clocks are not synchronized, let's estimate latency then */
1768             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1769             i->synchronized_clocks = FALSE;
1770             i->timestamp = local;
1771             pa_timeval_add(&i->timestamp, i->transport_usec);
1772         }
1773
1774         /* Invalidate read and write indexes if necessary */
1775         if (tag < o->stream->read_index_not_before)
1776             i->read_index_corrupt = TRUE;
1777
1778         if (tag < o->stream->write_index_not_before)
1779             i->write_index_corrupt = TRUE;
1780
1781         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1782             /* Write index correction */
1783
1784             int n, j;
1785             uint32_t ctag = tag;
1786
1787             /* Go through the saved correction values and add up the
1788              * total correction.*/
1789             for (n = 0, j = o->stream->current_write_index_correction+1;
1790                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1791                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1792
1793                 /* Step over invalid data or out-of-date data */
1794                 if (!o->stream->write_index_corrections[j].valid ||
1795                     o->stream->write_index_corrections[j].tag < ctag)
1796                     continue;
1797
1798                 /* Make sure that everything is in order */
1799                 ctag = o->stream->write_index_corrections[j].tag+1;
1800
1801                 /* Now fix the write index */
1802                 if (o->stream->write_index_corrections[j].corrupt) {
1803                     /* A corrupting seek was made */
1804                     i->write_index_corrupt = TRUE;
1805                 } else if (o->stream->write_index_corrections[j].absolute) {
1806                     /* An absolute seek was made */
1807                     i->write_index = o->stream->write_index_corrections[j].value;
1808                     i->write_index_corrupt = FALSE;
1809                 } else if (!i->write_index_corrupt) {
1810                     /* A relative seek was made */
1811                     i->write_index += o->stream->write_index_corrections[j].value;
1812                 }
1813             }
1814
1815             /* Clear old correction entries */
1816             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1817                 if (!o->stream->write_index_corrections[n].valid)
1818                     continue;
1819
1820                 if (o->stream->write_index_corrections[n].tag <= tag)
1821                     o->stream->write_index_corrections[n].valid = FALSE;
1822             }
1823         }
1824
1825         if (o->stream->direction == PA_STREAM_RECORD) {
1826             /* Read index correction */
1827
1828             if (!i->read_index_corrupt)
1829                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1830         }
1831
1832         /* Update smoother if we're not corked */
1833         if (o->stream->smoother && !o->stream->corked) {
1834             pa_usec_t u, x;
1835
1836             u = x = pa_rtclock_now() - i->transport_usec;
1837
1838             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1839                 pa_usec_t su;
1840
1841                 /* If we weren't playing then it will take some time
1842                  * until the audio will actually come out through the
1843                  * speakers. Since we follow that timing here, we need
1844                  * to try to fix this up */
1845
1846                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1847
1848                 if (su < i->sink_usec)
1849                     x += i->sink_usec - su;
1850             }
1851
1852             if (!i->playing)
1853                 pa_smoother_pause(o->stream->smoother, x);
1854
1855             /* Update the smoother */
1856             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1857                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1858                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1859
1860             if (i->playing)
1861                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1862         }
1863     }
1864
1865     o->stream->auto_timing_update_requested = FALSE;
1866
1867     if (o->stream->latency_update_callback)
1868         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1869
1870     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1871         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1872         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1873     }
1874
1875 finish:
1876
1877     pa_operation_done(o);
1878     pa_operation_unref(o);
1879 }
1880
1881 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1882     uint32_t tag;
1883     pa_operation *o;
1884     pa_tagstruct *t;
1885     struct timeval now;
1886     int cidx = 0;
1887
1888     pa_assert(s);
1889     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1890
1891     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1892     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1893     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1894
1895     if (s->direction == PA_STREAM_PLAYBACK) {
1896         /* Find a place to store the write_index correction data for this entry */
1897         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1898
1899         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1900         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1901     }
1902     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1903
1904     t = pa_tagstruct_command(
1905             s->context,
1906             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1907             &tag);
1908     pa_tagstruct_putu32(t, s->channel);
1909     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1910
1911     pa_pstream_send_tagstruct(s->context->pstream, t);
1912     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1913
1914     if (s->direction == PA_STREAM_PLAYBACK) {
1915         /* Fill in initial correction data */
1916
1917         s->current_write_index_correction = cidx;
1918
1919         s->write_index_corrections[cidx].valid = TRUE;
1920         s->write_index_corrections[cidx].absolute = FALSE;
1921         s->write_index_corrections[cidx].corrupt = FALSE;
1922         s->write_index_corrections[cidx].tag = tag;
1923         s->write_index_corrections[cidx].value = 0;
1924     }
1925
1926     return o;
1927 }
1928
1929 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1930     pa_stream *s = userdata;
1931
1932     pa_assert(pd);
1933     pa_assert(s);
1934     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1935
1936     pa_stream_ref(s);
1937
1938     if (command != PA_COMMAND_REPLY) {
1939         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1940             goto finish;
1941
1942         pa_stream_set_state(s, PA_STREAM_FAILED);
1943         goto finish;
1944     } else if (!pa_tagstruct_eof(t)) {
1945         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1946         goto finish;
1947     }
1948
1949     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1950
1951 finish:
1952     pa_stream_unref(s);
1953 }
1954
1955 int pa_stream_disconnect(pa_stream *s) {
1956     pa_tagstruct *t;
1957     uint32_t tag;
1958
1959     pa_assert(s);
1960     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1961
1962     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1963     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1964     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1965
1966     pa_stream_ref(s);
1967
1968     t = pa_tagstruct_command(
1969             s->context,
1970             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1971                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1972             &tag);
1973     pa_tagstruct_putu32(t, s->channel);
1974     pa_pstream_send_tagstruct(s->context->pstream, t);
1975     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1976
1977     pa_stream_unref(s);
1978     return 0;
1979 }
1980
1981 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1982     pa_assert(s);
1983     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1984
1985     if (pa_detect_fork())
1986         return;
1987
1988     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1989         return;
1990
1991     s->read_callback = cb;
1992     s->read_userdata = userdata;
1993 }
1994
1995 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1996     pa_assert(s);
1997     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1998
1999     if (pa_detect_fork())
2000         return;
2001
2002     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2003         return;
2004
2005     s->write_callback = cb;
2006     s->write_userdata = userdata;
2007 }
2008
2009 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2010     pa_assert(s);
2011     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2012
2013     if (pa_detect_fork())
2014         return;
2015
2016     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2017         return;
2018
2019     s->state_callback = cb;
2020     s->state_userdata = userdata;
2021 }
2022
2023 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2024     pa_assert(s);
2025     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2026
2027     if (pa_detect_fork())
2028         return;
2029
2030     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2031         return;
2032
2033     s->overflow_callback = cb;
2034     s->overflow_userdata = userdata;
2035 }
2036
2037 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2038     pa_assert(s);
2039     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2040
2041     if (pa_detect_fork())
2042         return;
2043
2044     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2045         return;
2046
2047     s->underflow_callback = cb;
2048     s->underflow_userdata = userdata;
2049 }
2050
2051 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2052     pa_assert(s);
2053     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2054
2055     if (pa_detect_fork())
2056         return;
2057
2058     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2059         return;
2060
2061     s->latency_update_callback = cb;
2062     s->latency_update_userdata = userdata;
2063 }
2064
2065 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2066     pa_assert(s);
2067     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2068
2069     if (pa_detect_fork())
2070         return;
2071
2072     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2073         return;
2074
2075     s->moved_callback = cb;
2076     s->moved_userdata = userdata;
2077 }
2078
2079 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2080     pa_assert(s);
2081     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2082
2083     if (pa_detect_fork())
2084         return;
2085
2086     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2087         return;
2088
2089     s->suspended_callback = cb;
2090     s->suspended_userdata = userdata;
2091 }
2092
2093 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2094     pa_assert(s);
2095     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2096
2097     if (pa_detect_fork())
2098         return;
2099
2100     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2101         return;
2102
2103     s->started_callback = cb;
2104     s->started_userdata = userdata;
2105 }
2106
2107 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2108     pa_assert(s);
2109     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2110
2111     if (pa_detect_fork())
2112         return;
2113
2114     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2115         return;
2116
2117     s->event_callback = cb;
2118     s->event_userdata = userdata;
2119 }
2120
2121 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2122     pa_assert(s);
2123     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2124
2125     if (pa_detect_fork())
2126         return;
2127
2128     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2129         return;
2130
2131     s->buffer_attr_callback = cb;
2132     s->buffer_attr_userdata = userdata;
2133 }
2134
2135 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2136     pa_operation *o = userdata;
2137     int success = 1;
2138
2139     pa_assert(pd);
2140     pa_assert(o);
2141     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2142
2143     if (!o->context)
2144         goto finish;
2145
2146     if (command != PA_COMMAND_REPLY) {
2147         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2148             goto finish;
2149
2150         success = 0;
2151     } else if (!pa_tagstruct_eof(t)) {
2152         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2153         goto finish;
2154     }
2155
2156     if (o->callback) {
2157         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2158         cb(o->stream, success, o->userdata);
2159     }
2160
2161 finish:
2162     pa_operation_done(o);
2163     pa_operation_unref(o);
2164 }
2165
2166 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2167     pa_operation *o;
2168     pa_tagstruct *t;
2169     uint32_t tag;
2170
2171     pa_assert(s);
2172     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2173
2174     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2175     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2176     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2177
2178     /* Ask for a timing update before we cork/uncork to get the best
2179      * accuracy for the transport latency suitable for the
2180      * check_smoother_status() call in the started callback */
2181     request_auto_timing_update(s, TRUE);
2182
2183     s->corked = b;
2184
2185     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2186
2187     t = pa_tagstruct_command(
2188             s->context,
2189             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2190             &tag);
2191     pa_tagstruct_putu32(t, s->channel);
2192     pa_tagstruct_put_boolean(t, !!b);
2193     pa_pstream_send_tagstruct(s->context->pstream, t);
2194     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2195
2196     check_smoother_status(s, FALSE, FALSE, FALSE);
2197
2198     /* This might cause the indexes to hang/start again, hence let's
2199      * request a timing update, after the cork/uncork, too */
2200     request_auto_timing_update(s, TRUE);
2201
2202     return o;
2203 }
2204
2205 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2206     pa_tagstruct *t;
2207     pa_operation *o;
2208     uint32_t tag;
2209
2210     pa_assert(s);
2211     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2212
2213     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2214     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2215
2216     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2217
2218     t = pa_tagstruct_command(s->context, command, &tag);
2219     pa_tagstruct_putu32(t, s->channel);
2220     pa_pstream_send_tagstruct(s->context->pstream, t);
2221     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2222
2223     return o;
2224 }
2225
2226 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2227     pa_operation *o;
2228
2229     pa_assert(s);
2230     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2231
2232     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2233     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2234     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2235
2236     /* Ask for a timing update *before* the flush, so that the
2237      * transport usec is as up to date as possible when we get the
2238      * underflow message and update the smoother status*/
2239     request_auto_timing_update(s, TRUE);
2240
2241     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2242         return NULL;
2243
2244     if (s->direction == PA_STREAM_PLAYBACK) {
2245
2246         if (s->write_index_corrections[s->current_write_index_correction].valid)
2247             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2248
2249         if (s->buffer_attr.prebuf > 0)
2250             check_smoother_status(s, FALSE, FALSE, TRUE);
2251
2252         /* This will change the write index, but leave the
2253          * read index untouched. */
2254         invalidate_indexes(s, FALSE, TRUE);
2255
2256     } else
2257         /* For record streams this has no influence on the write
2258          * index, but the read index might jump. */
2259         invalidate_indexes(s, TRUE, FALSE);
2260
2261     /* Note that we do not update requested_bytes here. This is
2262      * because we cannot really know how data actually was dropped
2263      * from the write index due to this. This 'error' will be applied
2264      * by both client and server and hence we should be fine. */
2265
2266     return o;
2267 }
2268
2269 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2270     pa_operation *o;
2271
2272     pa_assert(s);
2273     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2274
2275     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2276     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2277     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2278     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2279
2280     /* Ask for a timing update before we cork/uncork to get the best
2281      * accuracy for the transport latency suitable for the
2282      * check_smoother_status() call in the started callback */
2283     request_auto_timing_update(s, TRUE);
2284
2285     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2286         return NULL;
2287
2288     /* This might cause the read index to hang again, hence
2289      * let's request a timing update */
2290     request_auto_timing_update(s, TRUE);
2291
2292     return o;
2293 }
2294
2295 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2296     pa_operation *o;
2297
2298     pa_assert(s);
2299     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2300
2301     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2302     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2303     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2304     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2305
2306     /* Ask for a timing update before we cork/uncork to get the best
2307      * accuracy for the transport latency suitable for the
2308      * check_smoother_status() call in the started callback */
2309     request_auto_timing_update(s, TRUE);
2310
2311     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2312         return NULL;
2313
2314     /* This might cause the read index to start moving again, hence
2315      * let's request a timing update */
2316     request_auto_timing_update(s, TRUE);
2317
2318     return o;
2319 }
2320
2321 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2322     pa_operation *o;
2323
2324     pa_assert(s);
2325     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2326     pa_assert(name);
2327
2328     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2329     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2330     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2331
2332     if (s->context->version >= 13) {
2333         pa_proplist *p = pa_proplist_new();
2334
2335         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2336         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2337         pa_proplist_free(p);
2338     } else {
2339         pa_tagstruct *t;
2340         uint32_t tag;
2341
2342         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2343         t = pa_tagstruct_command(
2344                 s->context,
2345                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2346                 &tag);
2347         pa_tagstruct_putu32(t, s->channel);
2348         pa_tagstruct_puts(t, name);
2349         pa_pstream_send_tagstruct(s->context->pstream, t);
2350         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2351     }
2352
2353     return o;
2354 }
2355
2356 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2357     pa_usec_t usec;
2358
2359     pa_assert(s);
2360     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2361
2362     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2363     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2364     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2365     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2366     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2367     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2368
2369     if (s->smoother)
2370         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2371     else
2372         usec = calc_time(s, FALSE);
2373
2374     /* Make sure the time runs monotonically */
2375     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2376         if (usec < s->previous_time)
2377             usec = s->previous_time;
2378         else
2379             s->previous_time = usec;
2380     }
2381
2382     if (r_usec)
2383         *r_usec = usec;
2384
2385     return 0;
2386 }
2387
2388 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2389     pa_assert(s);
2390     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2391
2392     if (negative)
2393         *negative = 0;
2394
2395     if (a >= b)
2396         return a-b;
2397     else {
2398         if (negative && s->direction == PA_STREAM_RECORD) {
2399             *negative = 1;
2400             return b-a;
2401         } else
2402             return 0;
2403     }
2404 }
2405
2406 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2407     pa_usec_t t, c;
2408     int r;
2409     int64_t cindex;
2410
2411     pa_assert(s);
2412     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2413     pa_assert(r_usec);
2414
2415     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2416     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2417     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2418     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2419     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2420     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2421
2422     if ((r = pa_stream_get_time(s, &t)) < 0)
2423         return r;
2424
2425     if (s->direction == PA_STREAM_PLAYBACK)
2426         cindex = s->timing_info.write_index;
2427     else
2428         cindex = s->timing_info.read_index;
2429
2430     if (cindex < 0)
2431         cindex = 0;
2432
2433     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2434
2435     if (s->direction == PA_STREAM_PLAYBACK)
2436         *r_usec = time_counter_diff(s, c, t, negative);
2437     else
2438         *r_usec = time_counter_diff(s, t, c, negative);
2439
2440     return 0;
2441 }
2442
2443 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2444     pa_assert(s);
2445     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2446
2447     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2448     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2449     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2450     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2451
2452     return &s->timing_info;
2453 }
2454
2455 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2456     pa_assert(s);
2457     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2458
2459     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2460
2461     return &s->sample_spec;
2462 }
2463
2464 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2465     pa_assert(s);
2466     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2467
2468     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2469
2470     return &s->channel_map;
2471 }
2472
2473 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2474     pa_assert(s);
2475     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2476
2477     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2478     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2479     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2480
2481     return &s->buffer_attr;
2482 }
2483
2484 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2485     pa_operation *o = userdata;
2486     int success = 1;
2487
2488     pa_assert(pd);
2489     pa_assert(o);
2490     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2491
2492     if (!o->context)
2493         goto finish;
2494
2495     if (command != PA_COMMAND_REPLY) {
2496         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2497             goto finish;
2498
2499         success = 0;
2500     } else {
2501         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2502             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2503                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2504                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2505                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2506                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2507                 goto finish;
2508             }
2509         } else if (o->stream->direction == PA_STREAM_RECORD) {
2510             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2511                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2512                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2513                 goto finish;
2514             }
2515         }
2516
2517         if (o->stream->context->version >= 13) {
2518             pa_usec_t usec;
2519
2520             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2521                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2522                 goto finish;
2523             }
2524
2525             if (o->stream->direction == PA_STREAM_RECORD)
2526                 o->stream->timing_info.configured_source_usec = usec;
2527             else
2528                 o->stream->timing_info.configured_sink_usec = usec;
2529         }
2530
2531         if (!pa_tagstruct_eof(t)) {
2532             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2533             goto finish;
2534         }
2535     }
2536
2537     if (o->callback) {
2538         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2539         cb(o->stream, success, o->userdata);
2540     }
2541
2542 finish:
2543     pa_operation_done(o);
2544     pa_operation_unref(o);
2545 }
2546
2547
2548 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2549     pa_operation *o;
2550     pa_tagstruct *t;
2551     uint32_t tag;
2552     pa_buffer_attr copy;
2553
2554     pa_assert(s);
2555     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2556     pa_assert(attr);
2557
2558     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2559     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2560     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2561     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2562
2563     /* Ask for a timing update before we cork/uncork to get the best
2564      * accuracy for the transport latency suitable for the
2565      * check_smoother_status() call in the started callback */
2566     request_auto_timing_update(s, TRUE);
2567
2568     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2569
2570     t = pa_tagstruct_command(
2571             s->context,
2572             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2573             &tag);
2574     pa_tagstruct_putu32(t, s->channel);
2575
2576     copy = *attr;
2577     patch_buffer_attr(s, &copy, NULL);
2578     attr = &copy;
2579
2580     pa_tagstruct_putu32(t, attr->maxlength);
2581
2582     if (s->direction == PA_STREAM_PLAYBACK)
2583         pa_tagstruct_put(
2584                 t,
2585                 PA_TAG_U32, attr->tlength,
2586                 PA_TAG_U32, attr->prebuf,
2587                 PA_TAG_U32, attr->minreq,
2588                 PA_TAG_INVALID);
2589     else
2590         pa_tagstruct_putu32(t, attr->fragsize);
2591
2592     if (s->context->version >= 13)
2593         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2594
2595     if (s->context->version >= 14)
2596         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2597
2598     pa_pstream_send_tagstruct(s->context->pstream, t);
2599     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2600
2601     /* This might cause changes in the read/write indexex, hence let's
2602      * request a timing update */
2603     request_auto_timing_update(s, TRUE);
2604
2605     return o;
2606 }
2607
2608 uint32_t pa_stream_get_device_index(pa_stream *s) {
2609     pa_assert(s);
2610     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2611
2612     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2613     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2614     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2615     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2616     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2617
2618     return s->device_index;
2619 }
2620
2621 const char *pa_stream_get_device_name(pa_stream *s) {
2622     pa_assert(s);
2623     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2624
2625     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2626     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2627     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2628     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2629     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2630
2631     return s->device_name;
2632 }
2633
2634 int pa_stream_is_suspended(pa_stream *s) {
2635     pa_assert(s);
2636     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2637
2638     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2639     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2640     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2641     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2642
2643     return s->suspended;
2644 }
2645
2646 int pa_stream_is_corked(pa_stream *s) {
2647     pa_assert(s);
2648     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2649
2650     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2651     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2652     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2653
2654     return s->corked;
2655 }
2656
2657 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2658     pa_operation *o = userdata;
2659     int success = 1;
2660
2661     pa_assert(pd);
2662     pa_assert(o);
2663     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2664
2665     if (!o->context)
2666         goto finish;
2667
2668     if (command != PA_COMMAND_REPLY) {
2669         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2670             goto finish;
2671
2672         success = 0;
2673     } else {
2674
2675         if (!pa_tagstruct_eof(t)) {
2676             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2677             goto finish;
2678         }
2679     }
2680
2681     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2682     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2683
2684     if (o->callback) {
2685         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2686         cb(o->stream, success, o->userdata);
2687     }
2688
2689 finish:
2690     pa_operation_done(o);
2691     pa_operation_unref(o);
2692 }
2693
2694
2695 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2696     pa_operation *o;
2697     pa_tagstruct *t;
2698     uint32_t tag;
2699
2700     pa_assert(s);
2701     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2702
2703     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2704     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2705     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2706     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2707     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2708     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2709
2710     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2711     o->private = PA_UINT_TO_PTR(rate);
2712
2713     t = pa_tagstruct_command(
2714             s->context,
2715             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2716             &tag);
2717     pa_tagstruct_putu32(t, s->channel);
2718     pa_tagstruct_putu32(t, rate);
2719
2720     pa_pstream_send_tagstruct(s->context->pstream, t);
2721     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2722
2723     return o;
2724 }
2725
2726 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2727     pa_operation *o;
2728     pa_tagstruct *t;
2729     uint32_t tag;
2730
2731     pa_assert(s);
2732     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2733
2734     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2735     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2736     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2737     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2738     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2739
2740     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2741
2742     t = pa_tagstruct_command(
2743             s->context,
2744             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2745             &tag);
2746     pa_tagstruct_putu32(t, s->channel);
2747     pa_tagstruct_putu32(t, (uint32_t) mode);
2748     pa_tagstruct_put_proplist(t, p);
2749
2750     pa_pstream_send_tagstruct(s->context->pstream, t);
2751     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2752
2753     /* Please note that we don't update s->proplist here, because we
2754      * don't export that field */
2755
2756     return o;
2757 }
2758
2759 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2760     pa_operation *o;
2761     pa_tagstruct *t;
2762     uint32_t tag;
2763     const char * const*k;
2764
2765     pa_assert(s);
2766     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2767
2768     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2769     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2770     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2771     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2772     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2773
2774     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2775
2776     t = pa_tagstruct_command(
2777             s->context,
2778             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2779             &tag);
2780     pa_tagstruct_putu32(t, s->channel);
2781
2782     for (k = keys; *k; k++)
2783         pa_tagstruct_puts(t, *k);
2784
2785     pa_tagstruct_puts(t, NULL);
2786
2787     pa_pstream_send_tagstruct(s->context->pstream, t);
2788     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2789
2790     /* Please note that we don't update s->proplist here, because we
2791      * don't export that field */
2792
2793     return o;
2794 }
2795
2796 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2797     pa_assert(s);
2798     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2799
2800     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2801     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2802     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2803     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2804
2805     s->direct_on_input = sink_input_idx;
2806
2807     return 0;
2808 }
2809
2810 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2811     pa_assert(s);
2812     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2813
2814     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2815     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2816     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2817
2818     return s->direct_on_input;
2819 }