capture: Add the passthrough format negotiation to capture streams.
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_format_info * const *formats,
89         unsigned int n_formats,
90         pa_proplist *p) {
91
92     pa_stream *s;
93     unsigned int i;
94
95     pa_assert(c);
96     pa_assert(PA_REFCNT_VALUE(c) >= 1);
97     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98     pa_assert(n_formats < PA_MAX_FORMATS);
99
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     s = pa_xnew(pa_stream, 1);
104     PA_REFCNT_INIT(s);
105     s->context = c;
106     s->mainloop = c->mainloop;
107
108     s->direction = PA_STREAM_NODIRECTION;
109     s->state = PA_STREAM_UNCONNECTED;
110     s->flags = 0;
111
112     if (ss)
113         s->sample_spec = *ss;
114     else
115         s->sample_spec.format = PA_SAMPLE_INVALID;
116
117     if (map)
118         s->channel_map = *map;
119     else
120         pa_channel_map_init(&s->channel_map);
121
122     s->n_formats = 0;
123     if (formats) {
124         s->n_formats = n_formats;
125         for (i = 0; i < n_formats; i++)
126             s->req_formats[i] = pa_format_info_copy(formats[i]);
127     }
128
129     /* We'll get the final negotiated format after connecting */
130     s->format = NULL;
131
132     s->direct_on_input = PA_INVALID_INDEX;
133
134     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135     if (name)
136         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138     s->channel = 0;
139     s->channel_valid = FALSE;
140     s->syncid = c->csyncid++;
141     s->stream_index = PA_INVALID_INDEX;
142
143     s->requested_bytes = 0;
144     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146     /* We initialize der target length here, so that if the user
147      * passes no explicit buffering metrics the default is similar to
148      * what older PA versions provided. */
149
150     s->buffer_attr.maxlength = (uint32_t) -1;
151     if (ss)
152         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153     else {
154         /* FIXME: We assume a worst-case compressed format corresponding to
155          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156         pa_sample_spec tmp_ss = {
157             .format   = PA_SAMPLE_S16NE,
158             .rate     = 48000,
159             .channels = 2,
160         };
161         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162     }
163     s->buffer_attr.minreq = (uint32_t) -1;
164     s->buffer_attr.prebuf = (uint32_t) -1;
165     s->buffer_attr.fragsize = (uint32_t) -1;
166
167     s->device_index = PA_INVALID_INDEX;
168     s->device_name = NULL;
169     s->suspended = FALSE;
170     s->corked = FALSE;
171
172     s->write_memblock = NULL;
173     s->write_data = NULL;
174
175     pa_memchunk_reset(&s->peek_memchunk);
176     s->peek_data = NULL;
177     s->record_memblockq = NULL;
178
179     memset(&s->timing_info, 0, sizeof(s->timing_info));
180     s->timing_info_valid = FALSE;
181
182     s->previous_time = 0;
183
184     s->read_index_not_before = 0;
185     s->write_index_not_before = 0;
186     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
187         s->write_index_corrections[i].valid = 0;
188     s->current_write_index_correction = 0;
189
190     s->auto_timing_update_event = NULL;
191     s->auto_timing_update_requested = FALSE;
192     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
193
194     reset_callbacks(s);
195
196     s->smoother = NULL;
197
198     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
199     PA_LLIST_PREPEND(pa_stream, c->streams, s);
200     pa_stream_ref(s);
201
202     return s;
203 }
204
205 pa_stream *pa_stream_new_with_proplist(
206         pa_context *c,
207         const char *name,
208         const pa_sample_spec *ss,
209         const pa_channel_map *map,
210         pa_proplist *p) {
211
212     pa_channel_map tmap;
213
214     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
215     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
216     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
217     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
218     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
219
220     if (!map)
221         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
222
223     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
224 }
225
226 pa_stream *pa_stream_new_extended(
227         pa_context *c,
228         const char *name,
229         pa_format_info * const *formats,
230         unsigned int n_formats,
231         pa_proplist *p) {
232
233     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
234
235     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
236 }
237
238 static void stream_unlink(pa_stream *s) {
239     pa_operation *o, *n;
240     pa_assert(s);
241
242     if (!s->context)
243         return;
244
245     /* Detach from context */
246
247     /* Unref all operatio object that point to us */
248     for (o = s->context->operations; o; o = n) {
249         n = o->next;
250
251         if (o->stream == s)
252             pa_operation_cancel(o);
253     }
254
255     /* Drop all outstanding replies for this stream */
256     if (s->context->pdispatch)
257         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
258
259     if (s->channel_valid) {
260         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
261         s->channel = 0;
262         s->channel_valid = FALSE;
263     }
264
265     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
266     pa_stream_unref(s);
267
268     s->context = NULL;
269
270     if (s->auto_timing_update_event) {
271         pa_assert(s->mainloop);
272         s->mainloop->time_free(s->auto_timing_update_event);
273     }
274
275     reset_callbacks(s);
276 }
277
278 static void stream_free(pa_stream *s) {
279     unsigned int i;
280
281     pa_assert(s);
282
283     stream_unlink(s);
284
285     if (s->write_memblock) {
286         pa_memblock_release(s->write_memblock);
287         pa_memblock_unref(s->write_data);
288     }
289
290     if (s->peek_memchunk.memblock) {
291         if (s->peek_data)
292             pa_memblock_release(s->peek_memchunk.memblock);
293         pa_memblock_unref(s->peek_memchunk.memblock);
294     }
295
296     if (s->record_memblockq)
297         pa_memblockq_free(s->record_memblockq);
298
299     if (s->proplist)
300         pa_proplist_free(s->proplist);
301
302     if (s->smoother)
303         pa_smoother_free(s->smoother);
304
305     for (i = 0; i < s->n_formats; i++)
306         pa_format_info_free(s->req_formats[i]);
307
308     if (s->format)
309         pa_format_info_free(s->format);
310
311     pa_xfree(s->device_name);
312     pa_xfree(s);
313 }
314
315 void pa_stream_unref(pa_stream *s) {
316     pa_assert(s);
317     pa_assert(PA_REFCNT_VALUE(s) >= 1);
318
319     if (PA_REFCNT_DEC(s) <= 0)
320         stream_free(s);
321 }
322
323 pa_stream* pa_stream_ref(pa_stream *s) {
324     pa_assert(s);
325     pa_assert(PA_REFCNT_VALUE(s) >= 1);
326
327     PA_REFCNT_INC(s);
328     return s;
329 }
330
331 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
332     pa_assert(s);
333     pa_assert(PA_REFCNT_VALUE(s) >= 1);
334
335     return s->state;
336 }
337
338 pa_context* pa_stream_get_context(pa_stream *s) {
339     pa_assert(s);
340     pa_assert(PA_REFCNT_VALUE(s) >= 1);
341
342     return s->context;
343 }
344
345 uint32_t pa_stream_get_index(pa_stream *s) {
346     pa_assert(s);
347     pa_assert(PA_REFCNT_VALUE(s) >= 1);
348
349     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
350     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
351
352     return s->stream_index;
353 }
354
355 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
356     pa_assert(s);
357     pa_assert(PA_REFCNT_VALUE(s) >= 1);
358
359     if (s->state == st)
360         return;
361
362     pa_stream_ref(s);
363
364     s->state = st;
365
366     if (s->state_callback)
367         s->state_callback(s, s->state_userdata);
368
369     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
370         stream_unlink(s);
371
372     pa_stream_unref(s);
373 }
374
375 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
376     pa_assert(s);
377     pa_assert(PA_REFCNT_VALUE(s) >= 1);
378
379     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
380         return;
381
382     if (s->state == PA_STREAM_READY &&
383         (force || !s->auto_timing_update_requested)) {
384         pa_operation *o;
385
386 /*         pa_log("Automatically requesting new timing data"); */
387
388         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
389             pa_operation_unref(o);
390             s->auto_timing_update_requested = TRUE;
391         }
392     }
393
394     if (s->auto_timing_update_event) {
395         if (s->suspended && !force) {
396             pa_assert(s->mainloop);
397             s->mainloop->time_free(s->auto_timing_update_event);
398             s->auto_timing_update_event = NULL;
399         } else {
400             if (force)
401                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
402
403             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
404
405             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
406         }
407     }
408 }
409
410 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
411     pa_context *c = userdata;
412     pa_stream *s;
413     uint32_t channel;
414
415     pa_assert(pd);
416     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
417     pa_assert(t);
418     pa_assert(c);
419     pa_assert(PA_REFCNT_VALUE(c) >= 1);
420
421     pa_context_ref(c);
422
423     if (pa_tagstruct_getu32(t, &channel) < 0 ||
424         !pa_tagstruct_eof(t)) {
425         pa_context_fail(c, PA_ERR_PROTOCOL);
426         goto finish;
427     }
428
429     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
430         goto finish;
431
432     if (s->state != PA_STREAM_READY)
433         goto finish;
434
435     pa_context_set_error(c, PA_ERR_KILLED);
436     pa_stream_set_state(s, PA_STREAM_FAILED);
437
438 finish:
439     pa_context_unref(c);
440 }
441
442 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
443     pa_usec_t x;
444
445     pa_assert(s);
446     pa_assert(!force_start || !force_stop);
447
448     if (!s->smoother)
449         return;
450
451     x = pa_rtclock_now();
452
453     if (s->timing_info_valid) {
454         if (aposteriori)
455             x -= s->timing_info.transport_usec;
456         else
457             x += s->timing_info.transport_usec;
458     }
459
460     if (s->suspended || s->corked || force_stop)
461         pa_smoother_pause(s->smoother, x);
462     else if (force_start || s->buffer_attr.prebuf == 0) {
463
464         if (!s->timing_info_valid &&
465             !aposteriori &&
466             !force_start &&
467             !force_stop &&
468             s->context->version >= 13) {
469
470             /* If the server supports STARTED events we take them as
471              * indications when audio really starts/stops playing, if
472              * we don't have any timing info yet -- instead of trying
473              * to be smart and guessing the server time. Otherwise the
474              * unknown transport delay add too much noise to our time
475              * calculations. */
476
477             return;
478         }
479
480         pa_smoother_resume(s->smoother, x, TRUE);
481     }
482
483     /* Please note that we have no idea if playback actually started
484      * if prebuf is non-zero! */
485 }
486
487 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
488
489 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490     pa_context *c = userdata;
491     pa_stream *s;
492     uint32_t channel;
493     const char *dn;
494     pa_bool_t suspended;
495     uint32_t di;
496     pa_usec_t usec = 0;
497     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
498
499     pa_assert(pd);
500     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
501     pa_assert(t);
502     pa_assert(c);
503     pa_assert(PA_REFCNT_VALUE(c) >= 1);
504
505     pa_context_ref(c);
506
507     if (c->version < 12) {
508         pa_context_fail(c, PA_ERR_PROTOCOL);
509         goto finish;
510     }
511
512     if (pa_tagstruct_getu32(t, &channel) < 0 ||
513         pa_tagstruct_getu32(t, &di) < 0 ||
514         pa_tagstruct_gets(t, &dn) < 0 ||
515         pa_tagstruct_get_boolean(t, &suspended) < 0) {
516         pa_context_fail(c, PA_ERR_PROTOCOL);
517         goto finish;
518     }
519
520     if (c->version >= 13) {
521
522         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
523             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
524                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
525                 pa_tagstruct_get_usec(t, &usec) < 0) {
526                 pa_context_fail(c, PA_ERR_PROTOCOL);
527                 goto finish;
528             }
529         } else {
530             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
531                 pa_tagstruct_getu32(t, &tlength) < 0 ||
532                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
533                 pa_tagstruct_getu32(t, &minreq) < 0 ||
534                 pa_tagstruct_get_usec(t, &usec) < 0) {
535                 pa_context_fail(c, PA_ERR_PROTOCOL);
536                 goto finish;
537             }
538         }
539     }
540
541     if (!pa_tagstruct_eof(t)) {
542         pa_context_fail(c, PA_ERR_PROTOCOL);
543         goto finish;
544     }
545
546     if (!dn || di == PA_INVALID_INDEX) {
547         pa_context_fail(c, PA_ERR_PROTOCOL);
548         goto finish;
549     }
550
551     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
552         goto finish;
553
554     if (s->state != PA_STREAM_READY)
555         goto finish;
556
557     if (c->version >= 13) {
558         if (s->direction == PA_STREAM_RECORD)
559             s->timing_info.configured_source_usec = usec;
560         else
561             s->timing_info.configured_sink_usec = usec;
562
563         s->buffer_attr.maxlength = maxlength;
564         s->buffer_attr.fragsize = fragsize;
565         s->buffer_attr.tlength = tlength;
566         s->buffer_attr.prebuf = prebuf;
567         s->buffer_attr.minreq = minreq;
568     }
569
570     pa_xfree(s->device_name);
571     s->device_name = pa_xstrdup(dn);
572     s->device_index = di;
573
574     s->suspended = suspended;
575
576     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
577         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
578         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
579         request_auto_timing_update(s, TRUE);
580     }
581
582     check_smoother_status(s, TRUE, FALSE, FALSE);
583     request_auto_timing_update(s, TRUE);
584
585     if (s->moved_callback)
586         s->moved_callback(s, s->moved_userdata);
587
588 finish:
589     pa_context_unref(c);
590 }
591
592 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
593     pa_context *c = userdata;
594     pa_stream *s;
595     uint32_t channel;
596     pa_usec_t usec = 0;
597     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
598
599     pa_assert(pd);
600     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
601     pa_assert(t);
602     pa_assert(c);
603     pa_assert(PA_REFCNT_VALUE(c) >= 1);
604
605     pa_context_ref(c);
606
607     if (c->version < 15) {
608         pa_context_fail(c, PA_ERR_PROTOCOL);
609         goto finish;
610     }
611
612     if (pa_tagstruct_getu32(t, &channel) < 0) {
613         pa_context_fail(c, PA_ERR_PROTOCOL);
614         goto finish;
615     }
616
617     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
618         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
619             pa_tagstruct_getu32(t, &fragsize) < 0 ||
620             pa_tagstruct_get_usec(t, &usec) < 0) {
621             pa_context_fail(c, PA_ERR_PROTOCOL);
622             goto finish;
623         }
624     } else {
625         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
626             pa_tagstruct_getu32(t, &tlength) < 0 ||
627             pa_tagstruct_getu32(t, &prebuf) < 0 ||
628             pa_tagstruct_getu32(t, &minreq) < 0 ||
629             pa_tagstruct_get_usec(t, &usec) < 0) {
630             pa_context_fail(c, PA_ERR_PROTOCOL);
631             goto finish;
632         }
633     }
634
635     if (!pa_tagstruct_eof(t)) {
636         pa_context_fail(c, PA_ERR_PROTOCOL);
637         goto finish;
638     }
639
640     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
641         goto finish;
642
643     if (s->state != PA_STREAM_READY)
644         goto finish;
645
646     if (s->direction == PA_STREAM_RECORD)
647         s->timing_info.configured_source_usec = usec;
648     else
649         s->timing_info.configured_sink_usec = usec;
650
651     s->buffer_attr.maxlength = maxlength;
652     s->buffer_attr.fragsize = fragsize;
653     s->buffer_attr.tlength = tlength;
654     s->buffer_attr.prebuf = prebuf;
655     s->buffer_attr.minreq = minreq;
656
657     request_auto_timing_update(s, TRUE);
658
659     if (s->buffer_attr_callback)
660         s->buffer_attr_callback(s, s->buffer_attr_userdata);
661
662 finish:
663     pa_context_unref(c);
664 }
665
666 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
667     pa_context *c = userdata;
668     pa_stream *s;
669     uint32_t channel;
670     pa_bool_t suspended;
671
672     pa_assert(pd);
673     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
674     pa_assert(t);
675     pa_assert(c);
676     pa_assert(PA_REFCNT_VALUE(c) >= 1);
677
678     pa_context_ref(c);
679
680     if (c->version < 12) {
681         pa_context_fail(c, PA_ERR_PROTOCOL);
682         goto finish;
683     }
684
685     if (pa_tagstruct_getu32(t, &channel) < 0 ||
686         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
687         !pa_tagstruct_eof(t)) {
688         pa_context_fail(c, PA_ERR_PROTOCOL);
689         goto finish;
690     }
691
692     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
693         goto finish;
694
695     if (s->state != PA_STREAM_READY)
696         goto finish;
697
698     s->suspended = suspended;
699
700     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
701         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
702         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
703         request_auto_timing_update(s, TRUE);
704     }
705
706     check_smoother_status(s, TRUE, FALSE, FALSE);
707     request_auto_timing_update(s, TRUE);
708
709     if (s->suspended_callback)
710         s->suspended_callback(s, s->suspended_userdata);
711
712 finish:
713     pa_context_unref(c);
714 }
715
716 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717     pa_context *c = userdata;
718     pa_stream *s;
719     uint32_t channel;
720
721     pa_assert(pd);
722     pa_assert(command == PA_COMMAND_STARTED);
723     pa_assert(t);
724     pa_assert(c);
725     pa_assert(PA_REFCNT_VALUE(c) >= 1);
726
727     pa_context_ref(c);
728
729     if (c->version < 13) {
730         pa_context_fail(c, PA_ERR_PROTOCOL);
731         goto finish;
732     }
733
734     if (pa_tagstruct_getu32(t, &channel) < 0 ||
735         !pa_tagstruct_eof(t)) {
736         pa_context_fail(c, PA_ERR_PROTOCOL);
737         goto finish;
738     }
739
740     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
741         goto finish;
742
743     if (s->state != PA_STREAM_READY)
744         goto finish;
745
746     check_smoother_status(s, TRUE, TRUE, FALSE);
747     request_auto_timing_update(s, TRUE);
748
749     if (s->started_callback)
750         s->started_callback(s, s->started_userdata);
751
752 finish:
753     pa_context_unref(c);
754 }
755
756 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757     pa_context *c = userdata;
758     pa_stream *s;
759     uint32_t channel;
760     pa_proplist *pl = NULL;
761     const char *event;
762
763     pa_assert(pd);
764     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
765     pa_assert(t);
766     pa_assert(c);
767     pa_assert(PA_REFCNT_VALUE(c) >= 1);
768
769     pa_context_ref(c);
770
771     if (c->version < 15) {
772         pa_context_fail(c, PA_ERR_PROTOCOL);
773         goto finish;
774     }
775
776     pl = pa_proplist_new();
777
778     if (pa_tagstruct_getu32(t, &channel) < 0 ||
779         pa_tagstruct_gets(t, &event) < 0 ||
780         pa_tagstruct_get_proplist(t, pl) < 0 ||
781         !pa_tagstruct_eof(t) || !event) {
782         pa_context_fail(c, PA_ERR_PROTOCOL);
783         goto finish;
784     }
785
786     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
787         goto finish;
788
789     if (s->state != PA_STREAM_READY)
790         goto finish;
791
792     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
793         /* Let client know what the running time was when the stream had to be
794          * killed  */
795         pa_usec_t time;
796         if (pa_stream_get_time(s, &time) == 0)
797             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) time);
798     }
799
800     if (s->event_callback)
801         s->event_callback(s, event, pl, s->event_userdata);
802
803 finish:
804     pa_context_unref(c);
805
806     if (pl)
807         pa_proplist_free(pl);
808 }
809
810 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
811     pa_stream *s;
812     pa_context *c = userdata;
813     uint32_t bytes, channel;
814
815     pa_assert(pd);
816     pa_assert(command == PA_COMMAND_REQUEST);
817     pa_assert(t);
818     pa_assert(c);
819     pa_assert(PA_REFCNT_VALUE(c) >= 1);
820
821     pa_context_ref(c);
822
823     if (pa_tagstruct_getu32(t, &channel) < 0 ||
824         pa_tagstruct_getu32(t, &bytes) < 0 ||
825         !pa_tagstruct_eof(t)) {
826         pa_context_fail(c, PA_ERR_PROTOCOL);
827         goto finish;
828     }
829
830     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
831         goto finish;
832
833     if (s->state != PA_STREAM_READY)
834         goto finish;
835
836     s->requested_bytes += bytes;
837
838     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
839
840     if (s->requested_bytes > 0 && s->write_callback)
841         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
842
843 finish:
844     pa_context_unref(c);
845 }
846
847 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
848     pa_stream *s;
849     pa_context *c = userdata;
850     uint32_t channel;
851
852     pa_assert(pd);
853     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
854     pa_assert(t);
855     pa_assert(c);
856     pa_assert(PA_REFCNT_VALUE(c) >= 1);
857
858     pa_context_ref(c);
859
860     if (pa_tagstruct_getu32(t, &channel) < 0 ||
861         !pa_tagstruct_eof(t)) {
862         pa_context_fail(c, PA_ERR_PROTOCOL);
863         goto finish;
864     }
865
866     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
867         goto finish;
868
869     if (s->state != PA_STREAM_READY)
870         goto finish;
871
872     if (s->buffer_attr.prebuf > 0)
873         check_smoother_status(s, TRUE, FALSE, TRUE);
874
875     request_auto_timing_update(s, TRUE);
876
877     if (command == PA_COMMAND_OVERFLOW) {
878         if (s->overflow_callback)
879             s->overflow_callback(s, s->overflow_userdata);
880     } else if (command == PA_COMMAND_UNDERFLOW) {
881         if (s->underflow_callback)
882             s->underflow_callback(s, s->underflow_userdata);
883     }
884
885 finish:
886     pa_context_unref(c);
887 }
888
889 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
890     pa_assert(s);
891     pa_assert(PA_REFCNT_VALUE(s) >= 1);
892
893 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
894
895     if (s->state != PA_STREAM_READY)
896         return;
897
898     if (w) {
899         s->write_index_not_before = s->context->ctag;
900
901         if (s->timing_info_valid)
902             s->timing_info.write_index_corrupt = TRUE;
903
904 /*         pa_log("write_index invalidated"); */
905     }
906
907     if (r) {
908         s->read_index_not_before = s->context->ctag;
909
910         if (s->timing_info_valid)
911             s->timing_info.read_index_corrupt = TRUE;
912
913 /*         pa_log("read_index invalidated"); */
914     }
915
916     request_auto_timing_update(s, TRUE);
917 }
918
919 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
920     pa_stream *s = userdata;
921
922     pa_assert(s);
923     pa_assert(PA_REFCNT_VALUE(s) >= 1);
924
925     pa_stream_ref(s);
926     request_auto_timing_update(s, FALSE);
927     pa_stream_unref(s);
928 }
929
930 static void create_stream_complete(pa_stream *s) {
931     pa_assert(s);
932     pa_assert(PA_REFCNT_VALUE(s) >= 1);
933     pa_assert(s->state == PA_STREAM_CREATING);
934
935     pa_stream_set_state(s, PA_STREAM_READY);
936
937     if (s->requested_bytes > 0 && s->write_callback)
938         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
939
940     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
941         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
942         pa_assert(!s->auto_timing_update_event);
943         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
944
945         request_auto_timing_update(s, TRUE);
946     }
947
948     check_smoother_status(s, TRUE, FALSE, FALSE);
949 }
950
951 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
952     const char *e;
953
954     pa_assert(s);
955     pa_assert(attr);
956
957     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
958         uint32_t ms;
959
960         if (pa_atou(e, &ms) < 0 || ms <= 0)
961             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
962         else {
963             attr->maxlength = (uint32_t) -1;
964             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
965             attr->minreq = (uint32_t) -1;
966             attr->prebuf = (uint32_t) -1;
967             attr->fragsize = attr->tlength;
968         }
969
970         if (flags)
971             *flags |= PA_STREAM_ADJUST_LATENCY;
972     }
973
974     if (s->context->version >= 13)
975         return;
976
977     /* Version older than 0.9.10 didn't do server side buffer_attr
978      * selection, hence we have to fake it on the client side. */
979
980     /* We choose fairly conservative values here, to not confuse
981      * old clients with extremely large playback buffers */
982
983     if (attr->maxlength == (uint32_t) -1)
984         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
985
986     if (attr->tlength == (uint32_t) -1)
987         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
988
989     if (attr->minreq == (uint32_t) -1)
990         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
991
992     if (attr->prebuf == (uint32_t) -1)
993         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
994
995     if (attr->fragsize == (uint32_t) -1)
996         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
997 }
998
999 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1000     pa_stream *s = userdata;
1001     uint32_t requested_bytes = 0;
1002
1003     pa_assert(pd);
1004     pa_assert(s);
1005     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006     pa_assert(s->state == PA_STREAM_CREATING);
1007
1008     pa_stream_ref(s);
1009
1010     if (command != PA_COMMAND_REPLY) {
1011         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1012             goto finish;
1013
1014         pa_stream_set_state(s, PA_STREAM_FAILED);
1015         goto finish;
1016     }
1017
1018     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1019         s->channel == PA_INVALID_INDEX ||
1020         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1021         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1022         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1023         goto finish;
1024     }
1025
1026     s->requested_bytes = (int64_t) requested_bytes;
1027
1028     if (s->context->version >= 9) {
1029         if (s->direction == PA_STREAM_PLAYBACK) {
1030             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1031                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1032                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1033                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1034                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1035                 goto finish;
1036             }
1037         } else if (s->direction == PA_STREAM_RECORD) {
1038             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1039                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1040                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1041                 goto finish;
1042             }
1043         }
1044     }
1045
1046     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1047         pa_sample_spec ss;
1048         pa_channel_map cm;
1049         const char *dn = NULL;
1050         pa_bool_t suspended;
1051
1052         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1053             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1054             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1055             pa_tagstruct_gets(t, &dn) < 0 ||
1056             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1057             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1058             goto finish;
1059         }
1060
1061         if (!dn || s->device_index == PA_INVALID_INDEX ||
1062             ss.channels != cm.channels ||
1063             !pa_channel_map_valid(&cm) ||
1064             !pa_sample_spec_valid(&ss) ||
1065             (s->n_formats == 0 && (
1066                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1067                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1068                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1069             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1070             goto finish;
1071         }
1072
1073         pa_xfree(s->device_name);
1074         s->device_name = pa_xstrdup(dn);
1075         s->suspended = suspended;
1076
1077         s->channel_map = cm;
1078         s->sample_spec = ss;
1079     }
1080
1081     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1082         pa_usec_t usec;
1083
1084         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1085             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1086             goto finish;
1087         }
1088
1089         if (s->direction == PA_STREAM_RECORD)
1090             s->timing_info.configured_source_usec = usec;
1091         else
1092             s->timing_info.configured_sink_usec = usec;
1093     }
1094
1095     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1096         || s->context->version >= 22) {
1097
1098         pa_format_info *f = pa_format_info_new();
1099         pa_tagstruct_get_format_info(t, f);
1100
1101         if (pa_format_info_valid(f))
1102             s->format = f;
1103         else {
1104             pa_format_info_free(f);
1105             if (s->n_formats > 0) {
1106                 /* We used the extended API, so we should have got back a proper format */
1107                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1108                 goto finish;
1109             }
1110         }
1111     }
1112
1113     if (!pa_tagstruct_eof(t)) {
1114         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1115         goto finish;
1116     }
1117
1118     if (s->direction == PA_STREAM_RECORD) {
1119         pa_assert(!s->record_memblockq);
1120
1121         s->record_memblockq = pa_memblockq_new(
1122                 0,
1123                 s->buffer_attr.maxlength,
1124                 0,
1125                 pa_frame_size(&s->sample_spec),
1126                 1,
1127                 0,
1128                 0,
1129                 NULL);
1130     }
1131
1132     s->channel_valid = TRUE;
1133     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1134
1135     create_stream_complete(s);
1136
1137 finish:
1138     pa_stream_unref(s);
1139 }
1140
1141 static int create_stream(
1142         pa_stream_direction_t direction,
1143         pa_stream *s,
1144         const char *dev,
1145         const pa_buffer_attr *attr,
1146         pa_stream_flags_t flags,
1147         const pa_cvolume *volume,
1148         pa_stream *sync_stream) {
1149
1150     pa_tagstruct *t;
1151     uint32_t tag;
1152     pa_bool_t volume_set = FALSE;
1153     uint32_t i;
1154
1155     pa_assert(s);
1156     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1157     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1158
1159     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1160     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1161     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1162     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1163                                               PA_STREAM_INTERPOLATE_TIMING|
1164                                               PA_STREAM_NOT_MONOTONIC|
1165                                               PA_STREAM_AUTO_TIMING_UPDATE|
1166                                               PA_STREAM_NO_REMAP_CHANNELS|
1167                                               PA_STREAM_NO_REMIX_CHANNELS|
1168                                               PA_STREAM_FIX_FORMAT|
1169                                               PA_STREAM_FIX_RATE|
1170                                               PA_STREAM_FIX_CHANNELS|
1171                                               PA_STREAM_DONT_MOVE|
1172                                               PA_STREAM_VARIABLE_RATE|
1173                                               PA_STREAM_PEAK_DETECT|
1174                                               PA_STREAM_START_MUTED|
1175                                               PA_STREAM_ADJUST_LATENCY|
1176                                               PA_STREAM_EARLY_REQUESTS|
1177                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1178                                               PA_STREAM_START_UNMUTED|
1179                                               PA_STREAM_FAIL_ON_SUSPEND|
1180                                               PA_STREAM_RELATIVE_VOLUME|
1181                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1182
1183
1184     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1185     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1186     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1187     /* Althought some of the other flags are not supported on older
1188      * version, we don't check for them here, because it doesn't hurt
1189      * when they are passed but actually not supported. This makes
1190      * client development easier */
1191
1192     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1193     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1194     PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1195     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1196     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1197
1198     pa_stream_ref(s);
1199
1200     s->direction = direction;
1201
1202     if (sync_stream)
1203         s->syncid = sync_stream->syncid;
1204
1205     if (attr)
1206         s->buffer_attr = *attr;
1207     patch_buffer_attr(s, &s->buffer_attr, &flags);
1208
1209     s->flags = flags;
1210     s->corked = !!(flags & PA_STREAM_START_CORKED);
1211
1212     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1213         pa_usec_t x;
1214
1215         x = pa_rtclock_now();
1216
1217         pa_assert(!s->smoother);
1218         s->smoother = pa_smoother_new(
1219                 SMOOTHER_ADJUST_TIME,
1220                 SMOOTHER_HISTORY_TIME,
1221                 !(flags & PA_STREAM_NOT_MONOTONIC),
1222                 TRUE,
1223                 SMOOTHER_MIN_HISTORY,
1224                 x,
1225                 TRUE);
1226     }
1227
1228     if (!dev)
1229         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1230
1231     t = pa_tagstruct_command(
1232             s->context,
1233             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1234             &tag);
1235
1236     if (s->context->version < 13)
1237         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1238
1239     pa_tagstruct_put(
1240             t,
1241             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1242             PA_TAG_CHANNEL_MAP, &s->channel_map,
1243             PA_TAG_U32, PA_INVALID_INDEX,
1244             PA_TAG_STRING, dev,
1245             PA_TAG_U32, s->buffer_attr.maxlength,
1246             PA_TAG_BOOLEAN, s->corked,
1247             PA_TAG_INVALID);
1248
1249     if (s->direction == PA_STREAM_PLAYBACK) {
1250         pa_cvolume cv;
1251
1252         pa_tagstruct_put(
1253                 t,
1254                 PA_TAG_U32, s->buffer_attr.tlength,
1255                 PA_TAG_U32, s->buffer_attr.prebuf,
1256                 PA_TAG_U32, s->buffer_attr.minreq,
1257                 PA_TAG_U32, s->syncid,
1258                 PA_TAG_INVALID);
1259
1260         volume_set = !!volume;
1261
1262         if (!volume) {
1263             if (pa_sample_spec_valid(&s->sample_spec))
1264                 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1265             else {
1266                 /* This is not really relevant, since no volume was set, and
1267                  * the real number of channels is embedded in the format_info
1268                  * structure */
1269                 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1270             }
1271         }
1272
1273         pa_tagstruct_put_cvolume(t, volume);
1274     } else
1275         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1276
1277     if (s->context->version >= 12) {
1278         pa_tagstruct_put(
1279                 t,
1280                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1281                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1282                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1283                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1284                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1285                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1286                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1287                 PA_TAG_INVALID);
1288     }
1289
1290     if (s->context->version >= 13) {
1291
1292         if (s->direction == PA_STREAM_PLAYBACK)
1293             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1294         else
1295             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1296
1297         pa_tagstruct_put(
1298                 t,
1299                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1300                 PA_TAG_PROPLIST, s->proplist,
1301                 PA_TAG_INVALID);
1302
1303         if (s->direction == PA_STREAM_RECORD)
1304             pa_tagstruct_putu32(t, s->direct_on_input);
1305     }
1306
1307     if (s->context->version >= 14) {
1308
1309         if (s->direction == PA_STREAM_PLAYBACK)
1310             pa_tagstruct_put_boolean(t, volume_set);
1311
1312         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1313     }
1314
1315     if (s->context->version >= 15) {
1316
1317         if (s->direction == PA_STREAM_PLAYBACK)
1318             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1319
1320         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1321         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1322     }
1323
1324     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1325         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1326
1327     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1328         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1329
1330     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1331         || s->context->version >= 22) {
1332
1333         pa_tagstruct_putu8(t, s->n_formats);
1334         for (i = 0; i < s->n_formats; i++)
1335             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1336     }
1337
1338     pa_pstream_send_tagstruct(s->context->pstream, t);
1339     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1340
1341     pa_stream_set_state(s, PA_STREAM_CREATING);
1342
1343     pa_stream_unref(s);
1344     return 0;
1345 }
1346
1347 int pa_stream_connect_playback(
1348         pa_stream *s,
1349         const char *dev,
1350         const pa_buffer_attr *attr,
1351         pa_stream_flags_t flags,
1352         const pa_cvolume *volume,
1353         pa_stream *sync_stream) {
1354
1355     pa_assert(s);
1356     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1357
1358     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1359 }
1360
1361 int pa_stream_connect_record(
1362         pa_stream *s,
1363         const char *dev,
1364         const pa_buffer_attr *attr,
1365         pa_stream_flags_t flags) {
1366
1367     pa_assert(s);
1368     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1369
1370     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1371 }
1372
1373 int pa_stream_begin_write(
1374         pa_stream *s,
1375         void **data,
1376         size_t *nbytes) {
1377
1378     pa_assert(s);
1379     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1380
1381     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1382     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1383     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1384     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1385     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1386
1387     if (*nbytes != (size_t) -1) {
1388         size_t m, fs;
1389
1390         m = pa_mempool_block_size_max(s->context->mempool);
1391         fs = pa_frame_size(&s->sample_spec);
1392
1393         m = (m / fs) * fs;
1394         if (*nbytes > m)
1395             *nbytes = m;
1396     }
1397
1398     if (!s->write_memblock) {
1399         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1400         s->write_data = pa_memblock_acquire(s->write_memblock);
1401     }
1402
1403     *data = s->write_data;
1404     *nbytes = pa_memblock_get_length(s->write_memblock);
1405
1406     return 0;
1407 }
1408
1409 int pa_stream_cancel_write(
1410         pa_stream *s) {
1411
1412     pa_assert(s);
1413     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1414
1415     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1416     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1417     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1418     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1419
1420     pa_assert(s->write_data);
1421
1422     pa_memblock_release(s->write_memblock);
1423     pa_memblock_unref(s->write_memblock);
1424     s->write_memblock = NULL;
1425     s->write_data = NULL;
1426
1427     return 0;
1428 }
1429
1430 int pa_stream_write(
1431         pa_stream *s,
1432         const void *data,
1433         size_t length,
1434         pa_free_cb_t free_cb,
1435         int64_t offset,
1436         pa_seek_mode_t seek) {
1437
1438     pa_assert(s);
1439     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1440     pa_assert(data);
1441
1442     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1443     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1444     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1445     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1446     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1447     PA_CHECK_VALIDITY(s->context,
1448                       !s->write_memblock ||
1449                       ((data >= s->write_data) &&
1450                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1451                       PA_ERR_INVALID);
1452     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1453
1454     if (s->write_memblock) {
1455         pa_memchunk chunk;
1456
1457         /* pa_stream_write_begin() was called before */
1458
1459         pa_memblock_release(s->write_memblock);
1460
1461         chunk.memblock = s->write_memblock;
1462         chunk.index = (const char *) data - (const char *) s->write_data;
1463         chunk.length = length;
1464
1465         s->write_memblock = NULL;
1466         s->write_data = NULL;
1467
1468         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1469         pa_memblock_unref(chunk.memblock);
1470
1471     } else {
1472         pa_seek_mode_t t_seek = seek;
1473         int64_t t_offset = offset;
1474         size_t t_length = length;
1475         const void *t_data = data;
1476
1477         /* pa_stream_write_begin() was not called before */
1478
1479         while (t_length > 0) {
1480             pa_memchunk chunk;
1481
1482             chunk.index = 0;
1483
1484             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1485                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1486                 chunk.length = t_length;
1487             } else {
1488                 void *d;
1489
1490                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1491                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1492
1493                 d = pa_memblock_acquire(chunk.memblock);
1494                 memcpy(d, t_data, chunk.length);
1495                 pa_memblock_release(chunk.memblock);
1496             }
1497
1498             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1499
1500             t_offset = 0;
1501             t_seek = PA_SEEK_RELATIVE;
1502
1503             t_data = (const uint8_t*) t_data + chunk.length;
1504             t_length -= chunk.length;
1505
1506             pa_memblock_unref(chunk.memblock);
1507         }
1508
1509         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1510             free_cb((void*) data);
1511     }
1512
1513     /* This is obviously wrong since we ignore the seeking index . But
1514      * that's OK, the server side applies the same error */
1515     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1516
1517     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1518
1519     if (s->direction == PA_STREAM_PLAYBACK) {
1520
1521         /* Update latency request correction */
1522         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1523
1524             if (seek == PA_SEEK_ABSOLUTE) {
1525                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1526                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1527                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1528             } else if (seek == PA_SEEK_RELATIVE) {
1529                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1530                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1531             } else
1532                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1533         }
1534
1535         /* Update the write index in the already available latency data */
1536         if (s->timing_info_valid) {
1537
1538             if (seek == PA_SEEK_ABSOLUTE) {
1539                 s->timing_info.write_index_corrupt = FALSE;
1540                 s->timing_info.write_index = offset + (int64_t) length;
1541             } else if (seek == PA_SEEK_RELATIVE) {
1542                 if (!s->timing_info.write_index_corrupt)
1543                     s->timing_info.write_index += offset + (int64_t) length;
1544             } else
1545                 s->timing_info.write_index_corrupt = TRUE;
1546         }
1547
1548         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1549             request_auto_timing_update(s, TRUE);
1550     }
1551
1552     return 0;
1553 }
1554
1555 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1556     pa_assert(s);
1557     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1558     pa_assert(data);
1559     pa_assert(length);
1560
1561     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1562     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1563     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1564
1565     if (!s->peek_memchunk.memblock) {
1566
1567         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1568             *data = NULL;
1569             *length = 0;
1570             return 0;
1571         }
1572
1573         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1574     }
1575
1576     pa_assert(s->peek_data);
1577     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1578     *length = s->peek_memchunk.length;
1579     return 0;
1580 }
1581
1582 int pa_stream_drop(pa_stream *s) {
1583     pa_assert(s);
1584     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1585
1586     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1587     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1588     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1589     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1590
1591     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1592
1593     /* Fix the simulated local read index */
1594     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1595         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1596
1597     pa_assert(s->peek_data);
1598     pa_memblock_release(s->peek_memchunk.memblock);
1599     pa_memblock_unref(s->peek_memchunk.memblock);
1600     pa_memchunk_reset(&s->peek_memchunk);
1601
1602     return 0;
1603 }
1604
1605 size_t pa_stream_writable_size(pa_stream *s) {
1606     pa_assert(s);
1607     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1608
1609     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1610     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1611     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1612
1613     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1614 }
1615
1616 size_t pa_stream_readable_size(pa_stream *s) {
1617     pa_assert(s);
1618     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1619
1620     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1621     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1622     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1623
1624     return pa_memblockq_get_length(s->record_memblockq);
1625 }
1626
1627 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1628     pa_operation *o;
1629     pa_tagstruct *t;
1630     uint32_t tag;
1631
1632     pa_assert(s);
1633     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1634
1635     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1636     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1637     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1638
1639     /* Ask for a timing update before we cork/uncork to get the best
1640      * accuracy for the transport latency suitable for the
1641      * check_smoother_status() call in the started callback */
1642     request_auto_timing_update(s, TRUE);
1643
1644     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1645
1646     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1647     pa_tagstruct_putu32(t, s->channel);
1648     pa_pstream_send_tagstruct(s->context->pstream, t);
1649     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1650
1651     /* This might cause the read index to conitnue again, hence
1652      * let's request a timing update */
1653     request_auto_timing_update(s, TRUE);
1654
1655     return o;
1656 }
1657
1658 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1659     pa_usec_t usec;
1660
1661     pa_assert(s);
1662     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1663     pa_assert(s->state == PA_STREAM_READY);
1664     pa_assert(s->direction != PA_STREAM_UPLOAD);
1665     pa_assert(s->timing_info_valid);
1666     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1667     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1668
1669     if (s->direction == PA_STREAM_PLAYBACK) {
1670         /* The last byte that was written into the output device
1671          * had this time value associated */
1672         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1673
1674         if (!s->corked && !s->suspended) {
1675
1676             if (!ignore_transport)
1677                 /* Because the latency info took a little time to come
1678                  * to us, we assume that the real output time is actually
1679                  * a little ahead */
1680                 usec += s->timing_info.transport_usec;
1681
1682             /* However, the output device usually maintains a buffer
1683                too, hence the real sample currently played is a little
1684                back  */
1685             if (s->timing_info.sink_usec >= usec)
1686                 usec = 0;
1687             else
1688                 usec -= s->timing_info.sink_usec;
1689         }
1690
1691     } else {
1692         pa_assert(s->direction == PA_STREAM_RECORD);
1693
1694         /* The last byte written into the server side queue had
1695          * this time value associated */
1696         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1697
1698         if (!s->corked && !s->suspended) {
1699
1700             if (!ignore_transport)
1701                 /* Add transport latency */
1702                 usec += s->timing_info.transport_usec;
1703
1704             /* Add latency of data in device buffer */
1705             usec += s->timing_info.source_usec;
1706
1707             /* If this is a monitor source, we need to correct the
1708              * time by the playback device buffer */
1709             if (s->timing_info.sink_usec >= usec)
1710                 usec = 0;
1711             else
1712                 usec -= s->timing_info.sink_usec;
1713         }
1714     }
1715
1716     return usec;
1717 }
1718
1719 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1720     pa_operation *o = userdata;
1721     struct timeval local, remote, now;
1722     pa_timing_info *i;
1723     pa_bool_t playing = FALSE;
1724     uint64_t underrun_for = 0, playing_for = 0;
1725
1726     pa_assert(pd);
1727     pa_assert(o);
1728     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1729
1730     if (!o->context || !o->stream)
1731         goto finish;
1732
1733     i = &o->stream->timing_info;
1734
1735     o->stream->timing_info_valid = FALSE;
1736     i->write_index_corrupt = TRUE;
1737     i->read_index_corrupt = TRUE;
1738
1739     if (command != PA_COMMAND_REPLY) {
1740         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1741             goto finish;
1742
1743     } else {
1744
1745         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1746             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1747             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1748             pa_tagstruct_get_timeval(t, &local) < 0 ||
1749             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1750             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1751             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1752
1753             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1754             goto finish;
1755         }
1756
1757         if (o->context->version >= 13 &&
1758             o->stream->direction == PA_STREAM_PLAYBACK)
1759             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1760                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1761
1762                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1763                 goto finish;
1764             }
1765
1766
1767         if (!pa_tagstruct_eof(t)) {
1768             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1769             goto finish;
1770         }
1771         o->stream->timing_info_valid = TRUE;
1772         i->write_index_corrupt = FALSE;
1773         i->read_index_corrupt = FALSE;
1774
1775         i->playing = (int) playing;
1776         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1777
1778         pa_gettimeofday(&now);
1779
1780         /* Calculcate timestamps */
1781         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1782             /* local and remote seem to have synchronized clocks */
1783
1784             if (o->stream->direction == PA_STREAM_PLAYBACK)
1785                 i->transport_usec = pa_timeval_diff(&remote, &local);
1786             else
1787                 i->transport_usec = pa_timeval_diff(&now, &remote);
1788
1789             i->synchronized_clocks = TRUE;
1790             i->timestamp = remote;
1791         } else {
1792             /* clocks are not synchronized, let's estimate latency then */
1793             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1794             i->synchronized_clocks = FALSE;
1795             i->timestamp = local;
1796             pa_timeval_add(&i->timestamp, i->transport_usec);
1797         }
1798
1799         /* Invalidate read and write indexes if necessary */
1800         if (tag < o->stream->read_index_not_before)
1801             i->read_index_corrupt = TRUE;
1802
1803         if (tag < o->stream->write_index_not_before)
1804             i->write_index_corrupt = TRUE;
1805
1806         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1807             /* Write index correction */
1808
1809             int n, j;
1810             uint32_t ctag = tag;
1811
1812             /* Go through the saved correction values and add up the
1813              * total correction.*/
1814             for (n = 0, j = o->stream->current_write_index_correction+1;
1815                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1816                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1817
1818                 /* Step over invalid data or out-of-date data */
1819                 if (!o->stream->write_index_corrections[j].valid ||
1820                     o->stream->write_index_corrections[j].tag < ctag)
1821                     continue;
1822
1823                 /* Make sure that everything is in order */
1824                 ctag = o->stream->write_index_corrections[j].tag+1;
1825
1826                 /* Now fix the write index */
1827                 if (o->stream->write_index_corrections[j].corrupt) {
1828                     /* A corrupting seek was made */
1829                     i->write_index_corrupt = TRUE;
1830                 } else if (o->stream->write_index_corrections[j].absolute) {
1831                     /* An absolute seek was made */
1832                     i->write_index = o->stream->write_index_corrections[j].value;
1833                     i->write_index_corrupt = FALSE;
1834                 } else if (!i->write_index_corrupt) {
1835                     /* A relative seek was made */
1836                     i->write_index += o->stream->write_index_corrections[j].value;
1837                 }
1838             }
1839
1840             /* Clear old correction entries */
1841             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1842                 if (!o->stream->write_index_corrections[n].valid)
1843                     continue;
1844
1845                 if (o->stream->write_index_corrections[n].tag <= tag)
1846                     o->stream->write_index_corrections[n].valid = FALSE;
1847             }
1848         }
1849
1850         if (o->stream->direction == PA_STREAM_RECORD) {
1851             /* Read index correction */
1852
1853             if (!i->read_index_corrupt)
1854                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1855         }
1856
1857         /* Update smoother if we're not corked */
1858         if (o->stream->smoother && !o->stream->corked) {
1859             pa_usec_t u, x;
1860
1861             u = x = pa_rtclock_now() - i->transport_usec;
1862
1863             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1864                 pa_usec_t su;
1865
1866                 /* If we weren't playing then it will take some time
1867                  * until the audio will actually come out through the
1868                  * speakers. Since we follow that timing here, we need
1869                  * to try to fix this up */
1870
1871                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1872
1873                 if (su < i->sink_usec)
1874                     x += i->sink_usec - su;
1875             }
1876
1877             if (!i->playing)
1878                 pa_smoother_pause(o->stream->smoother, x);
1879
1880             /* Update the smoother */
1881             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1882                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1883                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1884
1885             if (i->playing)
1886                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1887         }
1888     }
1889
1890     o->stream->auto_timing_update_requested = FALSE;
1891
1892     if (o->stream->latency_update_callback)
1893         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1894
1895     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1896         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1897         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1898     }
1899
1900 finish:
1901
1902     pa_operation_done(o);
1903     pa_operation_unref(o);
1904 }
1905
1906 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1907     uint32_t tag;
1908     pa_operation *o;
1909     pa_tagstruct *t;
1910     struct timeval now;
1911     int cidx = 0;
1912
1913     pa_assert(s);
1914     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1915
1916     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1917     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1918     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1919
1920     if (s->direction == PA_STREAM_PLAYBACK) {
1921         /* Find a place to store the write_index correction data for this entry */
1922         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1923
1924         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1925         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1926     }
1927     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1928
1929     t = pa_tagstruct_command(
1930             s->context,
1931             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1932             &tag);
1933     pa_tagstruct_putu32(t, s->channel);
1934     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1935
1936     pa_pstream_send_tagstruct(s->context->pstream, t);
1937     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1938
1939     if (s->direction == PA_STREAM_PLAYBACK) {
1940         /* Fill in initial correction data */
1941
1942         s->current_write_index_correction = cidx;
1943
1944         s->write_index_corrections[cidx].valid = TRUE;
1945         s->write_index_corrections[cidx].absolute = FALSE;
1946         s->write_index_corrections[cidx].corrupt = FALSE;
1947         s->write_index_corrections[cidx].tag = tag;
1948         s->write_index_corrections[cidx].value = 0;
1949     }
1950
1951     return o;
1952 }
1953
1954 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1955     pa_stream *s = userdata;
1956
1957     pa_assert(pd);
1958     pa_assert(s);
1959     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1960
1961     pa_stream_ref(s);
1962
1963     if (command != PA_COMMAND_REPLY) {
1964         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1965             goto finish;
1966
1967         pa_stream_set_state(s, PA_STREAM_FAILED);
1968         goto finish;
1969     } else if (!pa_tagstruct_eof(t)) {
1970         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1971         goto finish;
1972     }
1973
1974     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1975
1976 finish:
1977     pa_stream_unref(s);
1978 }
1979
1980 int pa_stream_disconnect(pa_stream *s) {
1981     pa_tagstruct *t;
1982     uint32_t tag;
1983
1984     pa_assert(s);
1985     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1986
1987     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1988     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1989     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1990
1991     pa_stream_ref(s);
1992
1993     t = pa_tagstruct_command(
1994             s->context,
1995             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1996                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1997             &tag);
1998     pa_tagstruct_putu32(t, s->channel);
1999     pa_pstream_send_tagstruct(s->context->pstream, t);
2000     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2001
2002     pa_stream_unref(s);
2003     return 0;
2004 }
2005
2006 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2007     pa_assert(s);
2008     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2009
2010     if (pa_detect_fork())
2011         return;
2012
2013     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2014         return;
2015
2016     s->read_callback = cb;
2017     s->read_userdata = userdata;
2018 }
2019
2020 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2021     pa_assert(s);
2022     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2023
2024     if (pa_detect_fork())
2025         return;
2026
2027     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2028         return;
2029
2030     s->write_callback = cb;
2031     s->write_userdata = userdata;
2032 }
2033
2034 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2035     pa_assert(s);
2036     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2037
2038     if (pa_detect_fork())
2039         return;
2040
2041     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2042         return;
2043
2044     s->state_callback = cb;
2045     s->state_userdata = userdata;
2046 }
2047
2048 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2049     pa_assert(s);
2050     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2051
2052     if (pa_detect_fork())
2053         return;
2054
2055     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2056         return;
2057
2058     s->overflow_callback = cb;
2059     s->overflow_userdata = userdata;
2060 }
2061
2062 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2063     pa_assert(s);
2064     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2065
2066     if (pa_detect_fork())
2067         return;
2068
2069     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2070         return;
2071
2072     s->underflow_callback = cb;
2073     s->underflow_userdata = userdata;
2074 }
2075
2076 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2077     pa_assert(s);
2078     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2079
2080     if (pa_detect_fork())
2081         return;
2082
2083     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2084         return;
2085
2086     s->latency_update_callback = cb;
2087     s->latency_update_userdata = userdata;
2088 }
2089
2090 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2091     pa_assert(s);
2092     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2093
2094     if (pa_detect_fork())
2095         return;
2096
2097     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2098         return;
2099
2100     s->moved_callback = cb;
2101     s->moved_userdata = userdata;
2102 }
2103
2104 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2105     pa_assert(s);
2106     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2107
2108     if (pa_detect_fork())
2109         return;
2110
2111     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2112         return;
2113
2114     s->suspended_callback = cb;
2115     s->suspended_userdata = userdata;
2116 }
2117
2118 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2119     pa_assert(s);
2120     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2121
2122     if (pa_detect_fork())
2123         return;
2124
2125     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2126         return;
2127
2128     s->started_callback = cb;
2129     s->started_userdata = userdata;
2130 }
2131
2132 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2133     pa_assert(s);
2134     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2135
2136     if (pa_detect_fork())
2137         return;
2138
2139     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2140         return;
2141
2142     s->event_callback = cb;
2143     s->event_userdata = userdata;
2144 }
2145
2146 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2147     pa_assert(s);
2148     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2149
2150     if (pa_detect_fork())
2151         return;
2152
2153     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2154         return;
2155
2156     s->buffer_attr_callback = cb;
2157     s->buffer_attr_userdata = userdata;
2158 }
2159
2160 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2161     pa_operation *o = userdata;
2162     int success = 1;
2163
2164     pa_assert(pd);
2165     pa_assert(o);
2166     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2167
2168     if (!o->context)
2169         goto finish;
2170
2171     if (command != PA_COMMAND_REPLY) {
2172         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2173             goto finish;
2174
2175         success = 0;
2176     } else if (!pa_tagstruct_eof(t)) {
2177         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2178         goto finish;
2179     }
2180
2181     if (o->callback) {
2182         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2183         cb(o->stream, success, o->userdata);
2184     }
2185
2186 finish:
2187     pa_operation_done(o);
2188     pa_operation_unref(o);
2189 }
2190
2191 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2192     pa_operation *o;
2193     pa_tagstruct *t;
2194     uint32_t tag;
2195
2196     pa_assert(s);
2197     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2198
2199     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2200     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2201     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2202
2203     /* Ask for a timing update before we cork/uncork to get the best
2204      * accuracy for the transport latency suitable for the
2205      * check_smoother_status() call in the started callback */
2206     request_auto_timing_update(s, TRUE);
2207
2208     s->corked = b;
2209
2210     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2211
2212     t = pa_tagstruct_command(
2213             s->context,
2214             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2215             &tag);
2216     pa_tagstruct_putu32(t, s->channel);
2217     pa_tagstruct_put_boolean(t, !!b);
2218     pa_pstream_send_tagstruct(s->context->pstream, t);
2219     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2220
2221     check_smoother_status(s, FALSE, FALSE, FALSE);
2222
2223     /* This might cause the indexes to hang/start again, hence let's
2224      * request a timing update, after the cork/uncork, too */
2225     request_auto_timing_update(s, TRUE);
2226
2227     return o;
2228 }
2229
2230 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2231     pa_tagstruct *t;
2232     pa_operation *o;
2233     uint32_t tag;
2234
2235     pa_assert(s);
2236     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2237
2238     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2239     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2240
2241     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2242
2243     t = pa_tagstruct_command(s->context, command, &tag);
2244     pa_tagstruct_putu32(t, s->channel);
2245     pa_pstream_send_tagstruct(s->context->pstream, t);
2246     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2247
2248     return o;
2249 }
2250
2251 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2252     pa_operation *o;
2253
2254     pa_assert(s);
2255     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2256
2257     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2258     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2259     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2260
2261     /* Ask for a timing update *before* the flush, so that the
2262      * transport usec is as up to date as possible when we get the
2263      * underflow message and update the smoother status*/
2264     request_auto_timing_update(s, TRUE);
2265
2266     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2267         return NULL;
2268
2269     if (s->direction == PA_STREAM_PLAYBACK) {
2270
2271         if (s->write_index_corrections[s->current_write_index_correction].valid)
2272             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2273
2274         if (s->buffer_attr.prebuf > 0)
2275             check_smoother_status(s, FALSE, FALSE, TRUE);
2276
2277         /* This will change the write index, but leave the
2278          * read index untouched. */
2279         invalidate_indexes(s, FALSE, TRUE);
2280
2281     } else
2282         /* For record streams this has no influence on the write
2283          * index, but the read index might jump. */
2284         invalidate_indexes(s, TRUE, FALSE);
2285
2286     /* Note that we do not update requested_bytes here. This is
2287      * because we cannot really know how data actually was dropped
2288      * from the write index due to this. This 'error' will be applied
2289      * by both client and server and hence we should be fine. */
2290
2291     return o;
2292 }
2293
2294 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2295     pa_operation *o;
2296
2297     pa_assert(s);
2298     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2299
2300     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2301     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2302     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2303     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2304
2305     /* Ask for a timing update before we cork/uncork to get the best
2306      * accuracy for the transport latency suitable for the
2307      * check_smoother_status() call in the started callback */
2308     request_auto_timing_update(s, TRUE);
2309
2310     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2311         return NULL;
2312
2313     /* This might cause the read index to hang again, hence
2314      * let's request a timing update */
2315     request_auto_timing_update(s, TRUE);
2316
2317     return o;
2318 }
2319
2320 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2321     pa_operation *o;
2322
2323     pa_assert(s);
2324     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2325
2326     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2327     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2328     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2329     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2330
2331     /* Ask for a timing update before we cork/uncork to get the best
2332      * accuracy for the transport latency suitable for the
2333      * check_smoother_status() call in the started callback */
2334     request_auto_timing_update(s, TRUE);
2335
2336     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2337         return NULL;
2338
2339     /* This might cause the read index to start moving again, hence
2340      * let's request a timing update */
2341     request_auto_timing_update(s, TRUE);
2342
2343     return o;
2344 }
2345
2346 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2347     pa_operation *o;
2348
2349     pa_assert(s);
2350     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2351     pa_assert(name);
2352
2353     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2354     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2355     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2356
2357     if (s->context->version >= 13) {
2358         pa_proplist *p = pa_proplist_new();
2359
2360         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2361         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2362         pa_proplist_free(p);
2363     } else {
2364         pa_tagstruct *t;
2365         uint32_t tag;
2366
2367         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2368         t = pa_tagstruct_command(
2369                 s->context,
2370                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2371                 &tag);
2372         pa_tagstruct_putu32(t, s->channel);
2373         pa_tagstruct_puts(t, name);
2374         pa_pstream_send_tagstruct(s->context->pstream, t);
2375         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2376     }
2377
2378     return o;
2379 }
2380
2381 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2382     pa_usec_t usec;
2383
2384     pa_assert(s);
2385     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2386
2387     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2388     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2389     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2390     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2391     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2392     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2393
2394     if (s->smoother)
2395         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2396     else
2397         usec = calc_time(s, FALSE);
2398
2399     /* Make sure the time runs monotonically */
2400     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2401         if (usec < s->previous_time)
2402             usec = s->previous_time;
2403         else
2404             s->previous_time = usec;
2405     }
2406
2407     if (r_usec)
2408         *r_usec = usec;
2409
2410     return 0;
2411 }
2412
2413 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2414     pa_assert(s);
2415     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2416
2417     if (negative)
2418         *negative = 0;
2419
2420     if (a >= b)
2421         return a-b;
2422     else {
2423         if (negative && s->direction == PA_STREAM_RECORD) {
2424             *negative = 1;
2425             return b-a;
2426         } else
2427             return 0;
2428     }
2429 }
2430
2431 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2432     pa_usec_t t, c;
2433     int r;
2434     int64_t cindex;
2435
2436     pa_assert(s);
2437     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2438     pa_assert(r_usec);
2439
2440     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2441     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2442     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2443     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2444     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2445     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2446
2447     if ((r = pa_stream_get_time(s, &t)) < 0)
2448         return r;
2449
2450     if (s->direction == PA_STREAM_PLAYBACK)
2451         cindex = s->timing_info.write_index;
2452     else
2453         cindex = s->timing_info.read_index;
2454
2455     if (cindex < 0)
2456         cindex = 0;
2457
2458     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2459
2460     if (s->direction == PA_STREAM_PLAYBACK)
2461         *r_usec = time_counter_diff(s, c, t, negative);
2462     else
2463         *r_usec = time_counter_diff(s, t, c, negative);
2464
2465     return 0;
2466 }
2467
2468 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2469     pa_assert(s);
2470     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2471
2472     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2473     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2474     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2475     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2476
2477     return &s->timing_info;
2478 }
2479
2480 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2481     pa_assert(s);
2482     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2483
2484     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2485
2486     return &s->sample_spec;
2487 }
2488
2489 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2490     pa_assert(s);
2491     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2492
2493     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2494
2495     return &s->channel_map;
2496 }
2497
2498 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2499     pa_assert(s);
2500     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2501
2502     /* We don't have the format till routing is done */
2503     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2504     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2505
2506     return s->format;
2507 }
2508 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2509     pa_assert(s);
2510     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2511
2512     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2513     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2514     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2515
2516     return &s->buffer_attr;
2517 }
2518
2519 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2520     pa_operation *o = userdata;
2521     int success = 1;
2522
2523     pa_assert(pd);
2524     pa_assert(o);
2525     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2526
2527     if (!o->context)
2528         goto finish;
2529
2530     if (command != PA_COMMAND_REPLY) {
2531         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2532             goto finish;
2533
2534         success = 0;
2535     } else {
2536         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2537             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2538                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2539                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2540                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2541                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2542                 goto finish;
2543             }
2544         } else if (o->stream->direction == PA_STREAM_RECORD) {
2545             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2546                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2547                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2548                 goto finish;
2549             }
2550         }
2551
2552         if (o->stream->context->version >= 13) {
2553             pa_usec_t usec;
2554
2555             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2556                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2557                 goto finish;
2558             }
2559
2560             if (o->stream->direction == PA_STREAM_RECORD)
2561                 o->stream->timing_info.configured_source_usec = usec;
2562             else
2563                 o->stream->timing_info.configured_sink_usec = usec;
2564         }
2565
2566         if (!pa_tagstruct_eof(t)) {
2567             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2568             goto finish;
2569         }
2570     }
2571
2572     if (o->callback) {
2573         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2574         cb(o->stream, success, o->userdata);
2575     }
2576
2577 finish:
2578     pa_operation_done(o);
2579     pa_operation_unref(o);
2580 }
2581
2582
2583 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2584     pa_operation *o;
2585     pa_tagstruct *t;
2586     uint32_t tag;
2587     pa_buffer_attr copy;
2588
2589     pa_assert(s);
2590     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2591     pa_assert(attr);
2592
2593     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2594     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2595     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2596     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2597
2598     /* Ask for a timing update before we cork/uncork to get the best
2599      * accuracy for the transport latency suitable for the
2600      * check_smoother_status() call in the started callback */
2601     request_auto_timing_update(s, TRUE);
2602
2603     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2604
2605     t = pa_tagstruct_command(
2606             s->context,
2607             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2608             &tag);
2609     pa_tagstruct_putu32(t, s->channel);
2610
2611     copy = *attr;
2612     patch_buffer_attr(s, &copy, NULL);
2613     attr = &copy;
2614
2615     pa_tagstruct_putu32(t, attr->maxlength);
2616
2617     if (s->direction == PA_STREAM_PLAYBACK)
2618         pa_tagstruct_put(
2619                 t,
2620                 PA_TAG_U32, attr->tlength,
2621                 PA_TAG_U32, attr->prebuf,
2622                 PA_TAG_U32, attr->minreq,
2623                 PA_TAG_INVALID);
2624     else
2625         pa_tagstruct_putu32(t, attr->fragsize);
2626
2627     if (s->context->version >= 13)
2628         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2629
2630     if (s->context->version >= 14)
2631         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2632
2633     pa_pstream_send_tagstruct(s->context->pstream, t);
2634     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2635
2636     /* This might cause changes in the read/write indexex, hence let's
2637      * request a timing update */
2638     request_auto_timing_update(s, TRUE);
2639
2640     return o;
2641 }
2642
2643 uint32_t pa_stream_get_device_index(pa_stream *s) {
2644     pa_assert(s);
2645     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2646
2647     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2648     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2649     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2650     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2651     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2652
2653     return s->device_index;
2654 }
2655
2656 const char *pa_stream_get_device_name(pa_stream *s) {
2657     pa_assert(s);
2658     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2659
2660     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2661     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2662     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2663     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2664     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2665
2666     return s->device_name;
2667 }
2668
2669 int pa_stream_is_suspended(pa_stream *s) {
2670     pa_assert(s);
2671     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2672
2673     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2674     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2675     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2676     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2677
2678     return s->suspended;
2679 }
2680
2681 int pa_stream_is_corked(pa_stream *s) {
2682     pa_assert(s);
2683     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2684
2685     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2686     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2687     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2688
2689     return s->corked;
2690 }
2691
2692 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2693     pa_operation *o = userdata;
2694     int success = 1;
2695
2696     pa_assert(pd);
2697     pa_assert(o);
2698     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2699
2700     if (!o->context)
2701         goto finish;
2702
2703     if (command != PA_COMMAND_REPLY) {
2704         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2705             goto finish;
2706
2707         success = 0;
2708     } else {
2709
2710         if (!pa_tagstruct_eof(t)) {
2711             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2712             goto finish;
2713         }
2714     }
2715
2716     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2717     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2718
2719     if (o->callback) {
2720         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2721         cb(o->stream, success, o->userdata);
2722     }
2723
2724 finish:
2725     pa_operation_done(o);
2726     pa_operation_unref(o);
2727 }
2728
2729
2730 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2731     pa_operation *o;
2732     pa_tagstruct *t;
2733     uint32_t tag;
2734
2735     pa_assert(s);
2736     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2737
2738     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2739     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2740     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2741     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2742     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2743     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2744
2745     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2746     o->private = PA_UINT_TO_PTR(rate);
2747
2748     t = pa_tagstruct_command(
2749             s->context,
2750             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2751             &tag);
2752     pa_tagstruct_putu32(t, s->channel);
2753     pa_tagstruct_putu32(t, rate);
2754
2755     pa_pstream_send_tagstruct(s->context->pstream, t);
2756     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2757
2758     return o;
2759 }
2760
2761 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2762     pa_operation *o;
2763     pa_tagstruct *t;
2764     uint32_t tag;
2765
2766     pa_assert(s);
2767     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2768
2769     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2770     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2771     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2772     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2773     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2774
2775     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2776
2777     t = pa_tagstruct_command(
2778             s->context,
2779             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2780             &tag);
2781     pa_tagstruct_putu32(t, s->channel);
2782     pa_tagstruct_putu32(t, (uint32_t) mode);
2783     pa_tagstruct_put_proplist(t, p);
2784
2785     pa_pstream_send_tagstruct(s->context->pstream, t);
2786     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2787
2788     /* Please note that we don't update s->proplist here, because we
2789      * don't export that field */
2790
2791     return o;
2792 }
2793
2794 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2795     pa_operation *o;
2796     pa_tagstruct *t;
2797     uint32_t tag;
2798     const char * const*k;
2799
2800     pa_assert(s);
2801     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2802
2803     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2804     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2805     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2806     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2807     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2808
2809     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2810
2811     t = pa_tagstruct_command(
2812             s->context,
2813             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2814             &tag);
2815     pa_tagstruct_putu32(t, s->channel);
2816
2817     for (k = keys; *k; k++)
2818         pa_tagstruct_puts(t, *k);
2819
2820     pa_tagstruct_puts(t, NULL);
2821
2822     pa_pstream_send_tagstruct(s->context->pstream, t);
2823     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2824
2825     /* Please note that we don't update s->proplist here, because we
2826      * don't export that field */
2827
2828     return o;
2829 }
2830
2831 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2832     pa_assert(s);
2833     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2834
2835     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2836     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2837     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2838     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2839
2840     s->direct_on_input = sink_input_idx;
2841
2842     return 0;
2843 }
2844
2845 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2846     pa_assert(s);
2847     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2848
2849     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2850     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2851     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2852
2853     return s->direct_on_input;
2854 }