stream: Fix comments
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_format_info * const *formats,
89         unsigned int n_formats,
90         pa_proplist *p) {
91
92     pa_stream *s;
93     unsigned int i;
94
95     pa_assert(c);
96     pa_assert(PA_REFCNT_VALUE(c) >= 1);
97     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98     pa_assert(n_formats < PA_MAX_FORMATS);
99
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     s = pa_xnew(pa_stream, 1);
104     PA_REFCNT_INIT(s);
105     s->context = c;
106     s->mainloop = c->mainloop;
107
108     s->direction = PA_STREAM_NODIRECTION;
109     s->state = PA_STREAM_UNCONNECTED;
110     s->flags = 0;
111
112     if (ss)
113         s->sample_spec = *ss;
114     else
115         s->sample_spec.format = PA_SAMPLE_INVALID;
116
117     if (map)
118         s->channel_map = *map;
119     else
120         pa_channel_map_init(&s->channel_map);
121
122     s->n_formats = 0;
123     if (formats) {
124         s->n_formats = n_formats;
125         for (i = 0; i < n_formats; i++)
126             s->req_formats[i] = pa_format_info_copy(formats[i]);
127     }
128
129     /* We'll get the final negotiated format after connecting */
130     s->format = NULL;
131
132     s->direct_on_input = PA_INVALID_INDEX;
133
134     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135     if (name)
136         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138     s->channel = 0;
139     s->channel_valid = FALSE;
140     s->syncid = c->csyncid++;
141     s->stream_index = PA_INVALID_INDEX;
142
143     s->requested_bytes = 0;
144     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146     /* We initialize the target length here, so that if the user
147      * passes no explicit buffering metrics the default is similar to
148      * what older PA versions provided. */
149
150     s->buffer_attr.maxlength = (uint32_t) -1;
151     if (ss)
152         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153     else {
154         /* FIXME: We assume a worst-case compressed format corresponding to
155          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156         pa_sample_spec tmp_ss = {
157             .format   = PA_SAMPLE_S16NE,
158             .rate     = 48000,
159             .channels = 2,
160         };
161         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162     }
163     s->buffer_attr.minreq = (uint32_t) -1;
164     s->buffer_attr.prebuf = (uint32_t) -1;
165     s->buffer_attr.fragsize = (uint32_t) -1;
166
167     s->device_index = PA_INVALID_INDEX;
168     s->device_name = NULL;
169     s->suspended = FALSE;
170     s->corked = FALSE;
171
172     s->write_memblock = NULL;
173     s->write_data = NULL;
174
175     pa_memchunk_reset(&s->peek_memchunk);
176     s->peek_data = NULL;
177     s->record_memblockq = NULL;
178
179     memset(&s->timing_info, 0, sizeof(s->timing_info));
180     s->timing_info_valid = FALSE;
181
182     s->previous_time = 0;
183     s->latest_underrun_at_index = -1;
184
185     s->read_index_not_before = 0;
186     s->write_index_not_before = 0;
187     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188         s->write_index_corrections[i].valid = 0;
189     s->current_write_index_correction = 0;
190
191     s->auto_timing_update_event = NULL;
192     s->auto_timing_update_requested = FALSE;
193     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195     reset_callbacks(s);
196
197     s->smoother = NULL;
198
199     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200     PA_LLIST_PREPEND(pa_stream, c->streams, s);
201     pa_stream_ref(s);
202
203     return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207         pa_context *c,
208         const char *name,
209         const pa_sample_spec *ss,
210         const pa_channel_map *map,
211         pa_proplist *p) {
212
213     pa_channel_map tmap;
214
215     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221     if (!map)
222         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228         pa_context *c,
229         const char *name,
230         pa_format_info * const *formats,
231         unsigned int n_formats,
232         pa_proplist *p) {
233
234     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235
236     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
237 }
238
239 static void stream_unlink(pa_stream *s) {
240     pa_operation *o, *n;
241     pa_assert(s);
242
243     if (!s->context)
244         return;
245
246     /* Detach from context */
247
248     /* Unref all operation objects that point to us */
249     for (o = s->context->operations; o; o = n) {
250         n = o->next;
251
252         if (o->stream == s)
253             pa_operation_cancel(o);
254     }
255
256     /* Drop all outstanding replies for this stream */
257     if (s->context->pdispatch)
258         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259
260     if (s->channel_valid) {
261         pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
262         s->channel = 0;
263         s->channel_valid = FALSE;
264     }
265
266     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
267     pa_stream_unref(s);
268
269     s->context = NULL;
270
271     if (s->auto_timing_update_event) {
272         pa_assert(s->mainloop);
273         s->mainloop->time_free(s->auto_timing_update_event);
274     }
275
276     reset_callbacks(s);
277 }
278
279 static void stream_free(pa_stream *s) {
280     unsigned int i;
281
282     pa_assert(s);
283
284     stream_unlink(s);
285
286     if (s->write_memblock) {
287         pa_memblock_release(s->write_memblock);
288         pa_memblock_unref(s->write_data);
289     }
290
291     if (s->peek_memchunk.memblock) {
292         if (s->peek_data)
293             pa_memblock_release(s->peek_memchunk.memblock);
294         pa_memblock_unref(s->peek_memchunk.memblock);
295     }
296
297     if (s->record_memblockq)
298         pa_memblockq_free(s->record_memblockq);
299
300     if (s->proplist)
301         pa_proplist_free(s->proplist);
302
303     if (s->smoother)
304         pa_smoother_free(s->smoother);
305
306     for (i = 0; i < s->n_formats; i++)
307         pa_format_info_free(s->req_formats[i]);
308
309     if (s->format)
310         pa_format_info_free(s->format);
311
312     pa_xfree(s->device_name);
313     pa_xfree(s);
314 }
315
316 void pa_stream_unref(pa_stream *s) {
317     pa_assert(s);
318     pa_assert(PA_REFCNT_VALUE(s) >= 1);
319
320     if (PA_REFCNT_DEC(s) <= 0)
321         stream_free(s);
322 }
323
324 pa_stream* pa_stream_ref(pa_stream *s) {
325     pa_assert(s);
326     pa_assert(PA_REFCNT_VALUE(s) >= 1);
327
328     PA_REFCNT_INC(s);
329     return s;
330 }
331
332 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
333     pa_assert(s);
334     pa_assert(PA_REFCNT_VALUE(s) >= 1);
335
336     return s->state;
337 }
338
339 pa_context* pa_stream_get_context(pa_stream *s) {
340     pa_assert(s);
341     pa_assert(PA_REFCNT_VALUE(s) >= 1);
342
343     return s->context;
344 }
345
346 uint32_t pa_stream_get_index(pa_stream *s) {
347     pa_assert(s);
348     pa_assert(PA_REFCNT_VALUE(s) >= 1);
349
350     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
351     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
352
353     return s->stream_index;
354 }
355
356 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
357     pa_assert(s);
358     pa_assert(PA_REFCNT_VALUE(s) >= 1);
359
360     if (s->state == st)
361         return;
362
363     pa_stream_ref(s);
364
365     s->state = st;
366
367     if (s->state_callback)
368         s->state_callback(s, s->state_userdata);
369
370     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
371         stream_unlink(s);
372
373     pa_stream_unref(s);
374 }
375
376 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
377     pa_assert(s);
378     pa_assert(PA_REFCNT_VALUE(s) >= 1);
379
380     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
381         return;
382
383     if (s->state == PA_STREAM_READY &&
384         (force || !s->auto_timing_update_requested)) {
385         pa_operation *o;
386
387 /*         pa_log("Automatically requesting new timing data"); */
388
389         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
390             pa_operation_unref(o);
391             s->auto_timing_update_requested = TRUE;
392         }
393     }
394
395     if (s->auto_timing_update_event) {
396         if (s->suspended && !force) {
397             pa_assert(s->mainloop);
398             s->mainloop->time_free(s->auto_timing_update_event);
399             s->auto_timing_update_event = NULL;
400         } else {
401             if (force)
402                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
403
404             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
405
406             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
407         }
408     }
409 }
410
411 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
412     pa_context *c = userdata;
413     pa_stream *s;
414     uint32_t channel;
415
416     pa_assert(pd);
417     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
418     pa_assert(t);
419     pa_assert(c);
420     pa_assert(PA_REFCNT_VALUE(c) >= 1);
421
422     pa_context_ref(c);
423
424     if (pa_tagstruct_getu32(t, &channel) < 0 ||
425         !pa_tagstruct_eof(t)) {
426         pa_context_fail(c, PA_ERR_PROTOCOL);
427         goto finish;
428     }
429
430     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
431         goto finish;
432
433     if (s->state != PA_STREAM_READY)
434         goto finish;
435
436     pa_context_set_error(c, PA_ERR_KILLED);
437     pa_stream_set_state(s, PA_STREAM_FAILED);
438
439 finish:
440     pa_context_unref(c);
441 }
442
443 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
444     pa_usec_t x;
445
446     pa_assert(s);
447     pa_assert(!force_start || !force_stop);
448
449     if (!s->smoother)
450         return;
451
452     x = pa_rtclock_now();
453
454     if (s->timing_info_valid) {
455         if (aposteriori)
456             x -= s->timing_info.transport_usec;
457         else
458             x += s->timing_info.transport_usec;
459     }
460
461     if (s->suspended || s->corked || force_stop)
462         pa_smoother_pause(s->smoother, x);
463     else if (force_start || s->buffer_attr.prebuf == 0) {
464
465         if (!s->timing_info_valid &&
466             !aposteriori &&
467             !force_start &&
468             !force_stop &&
469             s->context->version >= 13) {
470
471             /* If the server supports STARTED events we take them as
472              * indications when audio really starts/stops playing, if
473              * we don't have any timing info yet -- instead of trying
474              * to be smart and guessing the server time. Otherwise the
475              * unknown transport delay adds too much noise to our time
476              * calculations. */
477
478             return;
479         }
480
481         pa_smoother_resume(s->smoother, x, TRUE);
482     }
483
484     /* Please note that we have no idea if playback actually started
485      * if prebuf is non-zero! */
486 }
487
488 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
489
490 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
491     pa_context *c = userdata;
492     pa_stream *s;
493     uint32_t channel;
494     const char *dn;
495     pa_bool_t suspended;
496     uint32_t di;
497     pa_usec_t usec = 0;
498     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
499
500     pa_assert(pd);
501     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
502     pa_assert(t);
503     pa_assert(c);
504     pa_assert(PA_REFCNT_VALUE(c) >= 1);
505
506     pa_context_ref(c);
507
508     if (c->version < 12) {
509         pa_context_fail(c, PA_ERR_PROTOCOL);
510         goto finish;
511     }
512
513     if (pa_tagstruct_getu32(t, &channel) < 0 ||
514         pa_tagstruct_getu32(t, &di) < 0 ||
515         pa_tagstruct_gets(t, &dn) < 0 ||
516         pa_tagstruct_get_boolean(t, &suspended) < 0) {
517         pa_context_fail(c, PA_ERR_PROTOCOL);
518         goto finish;
519     }
520
521     if (c->version >= 13) {
522
523         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
524             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
525                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
526                 pa_tagstruct_get_usec(t, &usec) < 0) {
527                 pa_context_fail(c, PA_ERR_PROTOCOL);
528                 goto finish;
529             }
530         } else {
531             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
532                 pa_tagstruct_getu32(t, &tlength) < 0 ||
533                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
534                 pa_tagstruct_getu32(t, &minreq) < 0 ||
535                 pa_tagstruct_get_usec(t, &usec) < 0) {
536                 pa_context_fail(c, PA_ERR_PROTOCOL);
537                 goto finish;
538             }
539         }
540     }
541
542     if (!pa_tagstruct_eof(t)) {
543         pa_context_fail(c, PA_ERR_PROTOCOL);
544         goto finish;
545     }
546
547     if (!dn || di == PA_INVALID_INDEX) {
548         pa_context_fail(c, PA_ERR_PROTOCOL);
549         goto finish;
550     }
551
552     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
553         goto finish;
554
555     if (s->state != PA_STREAM_READY)
556         goto finish;
557
558     if (c->version >= 13) {
559         if (s->direction == PA_STREAM_RECORD)
560             s->timing_info.configured_source_usec = usec;
561         else
562             s->timing_info.configured_sink_usec = usec;
563
564         s->buffer_attr.maxlength = maxlength;
565         s->buffer_attr.fragsize = fragsize;
566         s->buffer_attr.tlength = tlength;
567         s->buffer_attr.prebuf = prebuf;
568         s->buffer_attr.minreq = minreq;
569     }
570
571     pa_xfree(s->device_name);
572     s->device_name = pa_xstrdup(dn);
573     s->device_index = di;
574
575     s->suspended = suspended;
576
577     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
578         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
579         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
580         request_auto_timing_update(s, TRUE);
581     }
582
583     check_smoother_status(s, TRUE, FALSE, FALSE);
584     request_auto_timing_update(s, TRUE);
585
586     if (s->moved_callback)
587         s->moved_callback(s, s->moved_userdata);
588
589 finish:
590     pa_context_unref(c);
591 }
592
593 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
594     pa_context *c = userdata;
595     pa_stream *s;
596     uint32_t channel;
597     pa_usec_t usec = 0;
598     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
599
600     pa_assert(pd);
601     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
602     pa_assert(t);
603     pa_assert(c);
604     pa_assert(PA_REFCNT_VALUE(c) >= 1);
605
606     pa_context_ref(c);
607
608     if (c->version < 15) {
609         pa_context_fail(c, PA_ERR_PROTOCOL);
610         goto finish;
611     }
612
613     if (pa_tagstruct_getu32(t, &channel) < 0) {
614         pa_context_fail(c, PA_ERR_PROTOCOL);
615         goto finish;
616     }
617
618     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
619         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
620             pa_tagstruct_getu32(t, &fragsize) < 0 ||
621             pa_tagstruct_get_usec(t, &usec) < 0) {
622             pa_context_fail(c, PA_ERR_PROTOCOL);
623             goto finish;
624         }
625     } else {
626         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
627             pa_tagstruct_getu32(t, &tlength) < 0 ||
628             pa_tagstruct_getu32(t, &prebuf) < 0 ||
629             pa_tagstruct_getu32(t, &minreq) < 0 ||
630             pa_tagstruct_get_usec(t, &usec) < 0) {
631             pa_context_fail(c, PA_ERR_PROTOCOL);
632             goto finish;
633         }
634     }
635
636     if (!pa_tagstruct_eof(t)) {
637         pa_context_fail(c, PA_ERR_PROTOCOL);
638         goto finish;
639     }
640
641     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
642         goto finish;
643
644     if (s->state != PA_STREAM_READY)
645         goto finish;
646
647     if (s->direction == PA_STREAM_RECORD)
648         s->timing_info.configured_source_usec = usec;
649     else
650         s->timing_info.configured_sink_usec = usec;
651
652     s->buffer_attr.maxlength = maxlength;
653     s->buffer_attr.fragsize = fragsize;
654     s->buffer_attr.tlength = tlength;
655     s->buffer_attr.prebuf = prebuf;
656     s->buffer_attr.minreq = minreq;
657
658     request_auto_timing_update(s, TRUE);
659
660     if (s->buffer_attr_callback)
661         s->buffer_attr_callback(s, s->buffer_attr_userdata);
662
663 finish:
664     pa_context_unref(c);
665 }
666
667 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
668     pa_context *c = userdata;
669     pa_stream *s;
670     uint32_t channel;
671     pa_bool_t suspended;
672
673     pa_assert(pd);
674     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
675     pa_assert(t);
676     pa_assert(c);
677     pa_assert(PA_REFCNT_VALUE(c) >= 1);
678
679     pa_context_ref(c);
680
681     if (c->version < 12) {
682         pa_context_fail(c, PA_ERR_PROTOCOL);
683         goto finish;
684     }
685
686     if (pa_tagstruct_getu32(t, &channel) < 0 ||
687         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
688         !pa_tagstruct_eof(t)) {
689         pa_context_fail(c, PA_ERR_PROTOCOL);
690         goto finish;
691     }
692
693     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
694         goto finish;
695
696     if (s->state != PA_STREAM_READY)
697         goto finish;
698
699     s->suspended = suspended;
700
701     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
702         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
703         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
704         request_auto_timing_update(s, TRUE);
705     }
706
707     check_smoother_status(s, TRUE, FALSE, FALSE);
708     request_auto_timing_update(s, TRUE);
709
710     if (s->suspended_callback)
711         s->suspended_callback(s, s->suspended_userdata);
712
713 finish:
714     pa_context_unref(c);
715 }
716
717 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
718     pa_context *c = userdata;
719     pa_stream *s;
720     uint32_t channel;
721
722     pa_assert(pd);
723     pa_assert(command == PA_COMMAND_STARTED);
724     pa_assert(t);
725     pa_assert(c);
726     pa_assert(PA_REFCNT_VALUE(c) >= 1);
727
728     pa_context_ref(c);
729
730     if (c->version < 13) {
731         pa_context_fail(c, PA_ERR_PROTOCOL);
732         goto finish;
733     }
734
735     if (pa_tagstruct_getu32(t, &channel) < 0 ||
736         !pa_tagstruct_eof(t)) {
737         pa_context_fail(c, PA_ERR_PROTOCOL);
738         goto finish;
739     }
740
741     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
742         goto finish;
743
744     if (s->state != PA_STREAM_READY)
745         goto finish;
746
747     check_smoother_status(s, TRUE, TRUE, FALSE);
748     request_auto_timing_update(s, TRUE);
749
750     if (s->started_callback)
751         s->started_callback(s, s->started_userdata);
752
753 finish:
754     pa_context_unref(c);
755 }
756
757 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
758     pa_context *c = userdata;
759     pa_stream *s;
760     uint32_t channel;
761     pa_proplist *pl = NULL;
762     const char *event;
763
764     pa_assert(pd);
765     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
766     pa_assert(t);
767     pa_assert(c);
768     pa_assert(PA_REFCNT_VALUE(c) >= 1);
769
770     pa_context_ref(c);
771
772     if (c->version < 15) {
773         pa_context_fail(c, PA_ERR_PROTOCOL);
774         goto finish;
775     }
776
777     pl = pa_proplist_new();
778
779     if (pa_tagstruct_getu32(t, &channel) < 0 ||
780         pa_tagstruct_gets(t, &event) < 0 ||
781         pa_tagstruct_get_proplist(t, pl) < 0 ||
782         !pa_tagstruct_eof(t) || !event) {
783         pa_context_fail(c, PA_ERR_PROTOCOL);
784         goto finish;
785     }
786
787     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
788         goto finish;
789
790     if (s->state != PA_STREAM_READY)
791         goto finish;
792
793     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
794         /* Let client know what the running time was when the stream had to be killed  */
795         pa_usec_t stream_time;
796         if (pa_stream_get_time(s, &stream_time) == 0)
797             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
798     }
799
800     if (s->event_callback)
801         s->event_callback(s, event, pl, s->event_userdata);
802
803 finish:
804     pa_context_unref(c);
805
806     if (pl)
807         pa_proplist_free(pl);
808 }
809
810 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
811     pa_stream *s;
812     pa_context *c = userdata;
813     uint32_t bytes, channel;
814
815     pa_assert(pd);
816     pa_assert(command == PA_COMMAND_REQUEST);
817     pa_assert(t);
818     pa_assert(c);
819     pa_assert(PA_REFCNT_VALUE(c) >= 1);
820
821     pa_context_ref(c);
822
823     if (pa_tagstruct_getu32(t, &channel) < 0 ||
824         pa_tagstruct_getu32(t, &bytes) < 0 ||
825         !pa_tagstruct_eof(t)) {
826         pa_context_fail(c, PA_ERR_PROTOCOL);
827         goto finish;
828     }
829
830     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
831         goto finish;
832
833     if (s->state != PA_STREAM_READY)
834         goto finish;
835
836     s->requested_bytes += bytes;
837
838     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
839
840     if (s->requested_bytes > 0 && s->write_callback)
841         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
842
843 finish:
844     pa_context_unref(c);
845 }
846
847 int64_t pa_stream_get_underflow_index(pa_stream *p)
848 {
849     pa_assert(p);
850     return p->latest_underrun_at_index;
851 }
852
853 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
854     pa_stream *s;
855     pa_context *c = userdata;
856     uint32_t channel;
857     int64_t offset = -1;
858
859     pa_assert(pd);
860     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
861     pa_assert(t);
862     pa_assert(c);
863     pa_assert(PA_REFCNT_VALUE(c) >= 1);
864
865     pa_context_ref(c);
866
867     if (pa_tagstruct_getu32(t, &channel) < 0) {
868         pa_context_fail(c, PA_ERR_PROTOCOL);
869         goto finish;
870     }
871
872     if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
873         if (pa_tagstruct_gets64(t, &offset) < 0) {
874             pa_context_fail(c, PA_ERR_PROTOCOL);
875             goto finish;
876         }
877     }
878
879     if (!pa_tagstruct_eof(t)) {
880         pa_context_fail(c, PA_ERR_PROTOCOL);
881         goto finish;
882     }
883
884     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
885         goto finish;
886
887     if (s->state != PA_STREAM_READY)
888         goto finish;
889
890     if (offset != -1)
891         s->latest_underrun_at_index = offset;
892
893     if (s->buffer_attr.prebuf > 0)
894         check_smoother_status(s, TRUE, FALSE, TRUE);
895
896     request_auto_timing_update(s, TRUE);
897
898     if (command == PA_COMMAND_OVERFLOW) {
899         if (s->overflow_callback)
900             s->overflow_callback(s, s->overflow_userdata);
901     } else if (command == PA_COMMAND_UNDERFLOW) {
902         if (s->underflow_callback)
903             s->underflow_callback(s, s->underflow_userdata);
904     }
905
906 finish:
907     pa_context_unref(c);
908 }
909
910 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
911     pa_assert(s);
912     pa_assert(PA_REFCNT_VALUE(s) >= 1);
913
914 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
915
916     if (s->state != PA_STREAM_READY)
917         return;
918
919     if (w) {
920         s->write_index_not_before = s->context->ctag;
921
922         if (s->timing_info_valid)
923             s->timing_info.write_index_corrupt = TRUE;
924
925 /*         pa_log("write_index invalidated"); */
926     }
927
928     if (r) {
929         s->read_index_not_before = s->context->ctag;
930
931         if (s->timing_info_valid)
932             s->timing_info.read_index_corrupt = TRUE;
933
934 /*         pa_log("read_index invalidated"); */
935     }
936
937     request_auto_timing_update(s, TRUE);
938 }
939
940 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
941     pa_stream *s = userdata;
942
943     pa_assert(s);
944     pa_assert(PA_REFCNT_VALUE(s) >= 1);
945
946     pa_stream_ref(s);
947     request_auto_timing_update(s, FALSE);
948     pa_stream_unref(s);
949 }
950
951 static void create_stream_complete(pa_stream *s) {
952     pa_assert(s);
953     pa_assert(PA_REFCNT_VALUE(s) >= 1);
954     pa_assert(s->state == PA_STREAM_CREATING);
955
956     pa_stream_set_state(s, PA_STREAM_READY);
957
958     if (s->requested_bytes > 0 && s->write_callback)
959         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
960
961     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
962         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
963         pa_assert(!s->auto_timing_update_event);
964         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
965
966         request_auto_timing_update(s, TRUE);
967     }
968
969     check_smoother_status(s, TRUE, FALSE, FALSE);
970 }
971
972 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
973     const char *e;
974
975     pa_assert(s);
976     pa_assert(attr);
977
978     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
979         uint32_t ms;
980
981         if (pa_atou(e, &ms) < 0 || ms <= 0)
982             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
983         else {
984             attr->maxlength = (uint32_t) -1;
985             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
986             attr->minreq = (uint32_t) -1;
987             attr->prebuf = (uint32_t) -1;
988             attr->fragsize = attr->tlength;
989         }
990
991         if (flags)
992             *flags |= PA_STREAM_ADJUST_LATENCY;
993     }
994
995     if (s->context->version >= 13)
996         return;
997
998     /* Version older than 0.9.10 didn't do server side buffer_attr
999      * selection, hence we have to fake it on the client side. */
1000
1001     /* We choose fairly conservative values here, to not confuse
1002      * old clients with extremely large playback buffers */
1003
1004     if (attr->maxlength == (uint32_t) -1)
1005         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1006
1007     if (attr->tlength == (uint32_t) -1)
1008         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1009
1010     if (attr->minreq == (uint32_t) -1)
1011         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1012
1013     if (attr->prebuf == (uint32_t) -1)
1014         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1015
1016     if (attr->fragsize == (uint32_t) -1)
1017         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1018 }
1019
1020 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1021     pa_stream *s = userdata;
1022     uint32_t requested_bytes = 0;
1023
1024     pa_assert(pd);
1025     pa_assert(s);
1026     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1027     pa_assert(s->state == PA_STREAM_CREATING);
1028
1029     pa_stream_ref(s);
1030
1031     if (command != PA_COMMAND_REPLY) {
1032         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1033             goto finish;
1034
1035         pa_stream_set_state(s, PA_STREAM_FAILED);
1036         goto finish;
1037     }
1038
1039     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1040         s->channel == PA_INVALID_INDEX ||
1041         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1042         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1043         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1044         goto finish;
1045     }
1046
1047     s->requested_bytes = (int64_t) requested_bytes;
1048
1049     if (s->context->version >= 9) {
1050         if (s->direction == PA_STREAM_PLAYBACK) {
1051             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1052                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1053                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1054                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1055                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1056                 goto finish;
1057             }
1058         } else if (s->direction == PA_STREAM_RECORD) {
1059             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1060                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1061                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1062                 goto finish;
1063             }
1064         }
1065     }
1066
1067     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1068         pa_sample_spec ss;
1069         pa_channel_map cm;
1070         const char *dn = NULL;
1071         pa_bool_t suspended;
1072
1073         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1074             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1075             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1076             pa_tagstruct_gets(t, &dn) < 0 ||
1077             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1078             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1079             goto finish;
1080         }
1081
1082         if (!dn || s->device_index == PA_INVALID_INDEX ||
1083             ss.channels != cm.channels ||
1084             !pa_channel_map_valid(&cm) ||
1085             !pa_sample_spec_valid(&ss) ||
1086             (s->n_formats == 0 && (
1087                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1088                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1089                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1090             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1091             goto finish;
1092         }
1093
1094         pa_xfree(s->device_name);
1095         s->device_name = pa_xstrdup(dn);
1096         s->suspended = suspended;
1097
1098         s->channel_map = cm;
1099         s->sample_spec = ss;
1100     }
1101
1102     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1103         pa_usec_t usec;
1104
1105         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1106             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1107             goto finish;
1108         }
1109
1110         if (s->direction == PA_STREAM_RECORD)
1111             s->timing_info.configured_source_usec = usec;
1112         else
1113             s->timing_info.configured_sink_usec = usec;
1114     }
1115
1116     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1117         || s->context->version >= 22) {
1118
1119         pa_format_info *f = pa_format_info_new();
1120         pa_tagstruct_get_format_info(t, f);
1121
1122         if (pa_format_info_valid(f))
1123             s->format = f;
1124         else {
1125             pa_format_info_free(f);
1126             if (s->n_formats > 0) {
1127                 /* We used the extended API, so we should have got back a proper format */
1128                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1129                 goto finish;
1130             }
1131         }
1132     }
1133
1134     if (!pa_tagstruct_eof(t)) {
1135         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1136         goto finish;
1137     }
1138
1139     if (s->direction == PA_STREAM_RECORD) {
1140         pa_assert(!s->record_memblockq);
1141
1142         s->record_memblockq = pa_memblockq_new(
1143                 "client side record memblockq",
1144                 0,
1145                 s->buffer_attr.maxlength,
1146                 0,
1147                 &s->sample_spec,
1148                 1,
1149                 0,
1150                 0,
1151                 NULL);
1152     }
1153
1154     s->channel_valid = TRUE;
1155     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1156
1157     create_stream_complete(s);
1158
1159 finish:
1160     pa_stream_unref(s);
1161 }
1162
1163 static int create_stream(
1164         pa_stream_direction_t direction,
1165         pa_stream *s,
1166         const char *dev,
1167         const pa_buffer_attr *attr,
1168         pa_stream_flags_t flags,
1169         const pa_cvolume *volume,
1170         pa_stream *sync_stream) {
1171
1172     pa_tagstruct *t;
1173     uint32_t tag;
1174     pa_bool_t volume_set = !!volume;
1175     pa_cvolume cv;
1176     uint32_t i;
1177
1178     pa_assert(s);
1179     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1180     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1181
1182     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1183     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1184     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1185     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1186                                               PA_STREAM_INTERPOLATE_TIMING|
1187                                               PA_STREAM_NOT_MONOTONIC|
1188                                               PA_STREAM_AUTO_TIMING_UPDATE|
1189                                               PA_STREAM_NO_REMAP_CHANNELS|
1190                                               PA_STREAM_NO_REMIX_CHANNELS|
1191                                               PA_STREAM_FIX_FORMAT|
1192                                               PA_STREAM_FIX_RATE|
1193                                               PA_STREAM_FIX_CHANNELS|
1194                                               PA_STREAM_DONT_MOVE|
1195                                               PA_STREAM_VARIABLE_RATE|
1196                                               PA_STREAM_PEAK_DETECT|
1197                                               PA_STREAM_START_MUTED|
1198                                               PA_STREAM_ADJUST_LATENCY|
1199                                               PA_STREAM_EARLY_REQUESTS|
1200                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1201                                               PA_STREAM_START_UNMUTED|
1202                                               PA_STREAM_FAIL_ON_SUSPEND|
1203                                               PA_STREAM_RELATIVE_VOLUME|
1204                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1205
1206
1207     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1208     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1209     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1210     /* Although some of the other flags are not supported on older
1211      * version, we don't check for them here, because it doesn't hurt
1212      * when they are passed but actually not supported. This makes
1213      * client development easier */
1214
1215     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1216     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1217     PA_CHECK_VALIDITY(s->context, !volume || s->n_formats || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1218     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1219     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1220
1221     pa_stream_ref(s);
1222
1223     s->direction = direction;
1224
1225     if (sync_stream)
1226         s->syncid = sync_stream->syncid;
1227
1228     if (attr)
1229         s->buffer_attr = *attr;
1230     patch_buffer_attr(s, &s->buffer_attr, &flags);
1231
1232     s->flags = flags;
1233     s->corked = !!(flags & PA_STREAM_START_CORKED);
1234
1235     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1236         pa_usec_t x;
1237
1238         x = pa_rtclock_now();
1239
1240         pa_assert(!s->smoother);
1241         s->smoother = pa_smoother_new(
1242                 SMOOTHER_ADJUST_TIME,
1243                 SMOOTHER_HISTORY_TIME,
1244                 !(flags & PA_STREAM_NOT_MONOTONIC),
1245                 TRUE,
1246                 SMOOTHER_MIN_HISTORY,
1247                 x,
1248                 TRUE);
1249     }
1250
1251     if (!dev)
1252         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1253
1254     t = pa_tagstruct_command(
1255             s->context,
1256             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1257             &tag);
1258
1259     if (s->context->version < 13)
1260         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1261
1262     pa_tagstruct_put(
1263             t,
1264             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1265             PA_TAG_CHANNEL_MAP, &s->channel_map,
1266             PA_TAG_U32, PA_INVALID_INDEX,
1267             PA_TAG_STRING, dev,
1268             PA_TAG_U32, s->buffer_attr.maxlength,
1269             PA_TAG_BOOLEAN, s->corked,
1270             PA_TAG_INVALID);
1271
1272     if (!volume) {
1273         if (pa_sample_spec_valid(&s->sample_spec))
1274             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1275         else {
1276             /* This is not really relevant, since no volume was set, and
1277              * the real number of channels is embedded in the format_info
1278              * structure */
1279             volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1280         }
1281     }
1282
1283     if (s->direction == PA_STREAM_PLAYBACK) {
1284         pa_tagstruct_put(
1285                 t,
1286                 PA_TAG_U32, s->buffer_attr.tlength,
1287                 PA_TAG_U32, s->buffer_attr.prebuf,
1288                 PA_TAG_U32, s->buffer_attr.minreq,
1289                 PA_TAG_U32, s->syncid,
1290                 PA_TAG_INVALID);
1291
1292         pa_tagstruct_put_cvolume(t, volume);
1293     } else
1294         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1295
1296     if (s->context->version >= 12) {
1297         pa_tagstruct_put(
1298                 t,
1299                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1300                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1301                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1302                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1303                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1304                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1305                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1306                 PA_TAG_INVALID);
1307     }
1308
1309     if (s->context->version >= 13) {
1310
1311         if (s->direction == PA_STREAM_PLAYBACK)
1312             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1313         else
1314             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1315
1316         pa_tagstruct_put(
1317                 t,
1318                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1319                 PA_TAG_PROPLIST, s->proplist,
1320                 PA_TAG_INVALID);
1321
1322         if (s->direction == PA_STREAM_RECORD)
1323             pa_tagstruct_putu32(t, s->direct_on_input);
1324     }
1325
1326     if (s->context->version >= 14) {
1327
1328         if (s->direction == PA_STREAM_PLAYBACK)
1329             pa_tagstruct_put_boolean(t, volume_set);
1330
1331         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1332     }
1333
1334     if (s->context->version >= 15) {
1335
1336         if (s->direction == PA_STREAM_PLAYBACK)
1337             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1338
1339         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1340         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1341     }
1342
1343     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1344         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1345
1346     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1347         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1348
1349     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1350         || s->context->version >= 22) {
1351
1352         pa_tagstruct_putu8(t, s->n_formats);
1353         for (i = 0; i < s->n_formats; i++)
1354             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1355     }
1356
1357     if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1358         pa_tagstruct_put_cvolume(t, volume);
1359         pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1360         pa_tagstruct_put_boolean(t, volume_set);
1361         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1362         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1363         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1364     }
1365
1366     pa_pstream_send_tagstruct(s->context->pstream, t);
1367     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1368
1369     pa_stream_set_state(s, PA_STREAM_CREATING);
1370
1371     pa_stream_unref(s);
1372     return 0;
1373 }
1374
1375 int pa_stream_connect_playback(
1376         pa_stream *s,
1377         const char *dev,
1378         const pa_buffer_attr *attr,
1379         pa_stream_flags_t flags,
1380         const pa_cvolume *volume,
1381         pa_stream *sync_stream) {
1382
1383     pa_assert(s);
1384     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1385
1386     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1387 }
1388
1389 int pa_stream_connect_record(
1390         pa_stream *s,
1391         const char *dev,
1392         const pa_buffer_attr *attr,
1393         pa_stream_flags_t flags) {
1394
1395     pa_assert(s);
1396     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1397
1398     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1399 }
1400
1401 int pa_stream_begin_write(
1402         pa_stream *s,
1403         void **data,
1404         size_t *nbytes) {
1405
1406     pa_assert(s);
1407     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1408
1409     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1410     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1411     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1412     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1413     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1414
1415     if (*nbytes != (size_t) -1) {
1416         size_t m, fs;
1417
1418         m = pa_mempool_block_size_max(s->context->mempool);
1419         fs = pa_frame_size(&s->sample_spec);
1420
1421         m = (m / fs) * fs;
1422         if (*nbytes > m)
1423             *nbytes = m;
1424     }
1425
1426     if (!s->write_memblock) {
1427         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1428         s->write_data = pa_memblock_acquire(s->write_memblock);
1429     }
1430
1431     *data = s->write_data;
1432     *nbytes = pa_memblock_get_length(s->write_memblock);
1433
1434     return 0;
1435 }
1436
1437 int pa_stream_cancel_write(
1438         pa_stream *s) {
1439
1440     pa_assert(s);
1441     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1442
1443     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1444     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1445     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1446     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1447
1448     pa_assert(s->write_data);
1449
1450     pa_memblock_release(s->write_memblock);
1451     pa_memblock_unref(s->write_memblock);
1452     s->write_memblock = NULL;
1453     s->write_data = NULL;
1454
1455     return 0;
1456 }
1457
1458 int pa_stream_write(
1459         pa_stream *s,
1460         const void *data,
1461         size_t length,
1462         pa_free_cb_t free_cb,
1463         int64_t offset,
1464         pa_seek_mode_t seek) {
1465
1466     pa_assert(s);
1467     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1468     pa_assert(data);
1469
1470     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1471     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1472     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1473     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1474     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1475     PA_CHECK_VALIDITY(s->context,
1476                       !s->write_memblock ||
1477                       ((data >= s->write_data) &&
1478                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1479                       PA_ERR_INVALID);
1480     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1481
1482     if (s->write_memblock) {
1483         pa_memchunk chunk;
1484
1485         /* pa_stream_write_begin() was called before */
1486
1487         pa_memblock_release(s->write_memblock);
1488
1489         chunk.memblock = s->write_memblock;
1490         chunk.index = (const char *) data - (const char *) s->write_data;
1491         chunk.length = length;
1492
1493         s->write_memblock = NULL;
1494         s->write_data = NULL;
1495
1496         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1497         pa_memblock_unref(chunk.memblock);
1498
1499     } else {
1500         pa_seek_mode_t t_seek = seek;
1501         int64_t t_offset = offset;
1502         size_t t_length = length;
1503         const void *t_data = data;
1504
1505         /* pa_stream_write_begin() was not called before */
1506
1507         while (t_length > 0) {
1508             pa_memchunk chunk;
1509
1510             chunk.index = 0;
1511
1512             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1513                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1514                 chunk.length = t_length;
1515             } else {
1516                 void *d;
1517
1518                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1519                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1520
1521                 d = pa_memblock_acquire(chunk.memblock);
1522                 memcpy(d, t_data, chunk.length);
1523                 pa_memblock_release(chunk.memblock);
1524             }
1525
1526             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1527
1528             t_offset = 0;
1529             t_seek = PA_SEEK_RELATIVE;
1530
1531             t_data = (const uint8_t*) t_data + chunk.length;
1532             t_length -= chunk.length;
1533
1534             pa_memblock_unref(chunk.memblock);
1535         }
1536
1537         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1538             free_cb((void*) data);
1539     }
1540
1541     /* This is obviously wrong since we ignore the seeking index . But
1542      * that's OK, the server side applies the same error */
1543     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1544
1545     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1546
1547     if (s->direction == PA_STREAM_PLAYBACK) {
1548
1549         /* Update latency request correction */
1550         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1551
1552             if (seek == PA_SEEK_ABSOLUTE) {
1553                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1554                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1555                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1556             } else if (seek == PA_SEEK_RELATIVE) {
1557                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1558                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1559             } else
1560                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1561         }
1562
1563         /* Update the write index in the already available latency data */
1564         if (s->timing_info_valid) {
1565
1566             if (seek == PA_SEEK_ABSOLUTE) {
1567                 s->timing_info.write_index_corrupt = FALSE;
1568                 s->timing_info.write_index = offset + (int64_t) length;
1569             } else if (seek == PA_SEEK_RELATIVE) {
1570                 if (!s->timing_info.write_index_corrupt)
1571                     s->timing_info.write_index += offset + (int64_t) length;
1572             } else
1573                 s->timing_info.write_index_corrupt = TRUE;
1574         }
1575
1576         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1577             request_auto_timing_update(s, TRUE);
1578     }
1579
1580     return 0;
1581 }
1582
1583 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1584     pa_assert(s);
1585     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1586     pa_assert(data);
1587     pa_assert(length);
1588
1589     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1590     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1591     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1592
1593     if (!s->peek_memchunk.memblock) {
1594
1595         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1596             *data = NULL;
1597             *length = 0;
1598             return 0;
1599         }
1600
1601         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1602     }
1603
1604     pa_assert(s->peek_data);
1605     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1606     *length = s->peek_memchunk.length;
1607     return 0;
1608 }
1609
1610 int pa_stream_drop(pa_stream *s) {
1611     pa_assert(s);
1612     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1613
1614     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1615     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1616     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1617     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1618
1619     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1620
1621     /* Fix the simulated local read index */
1622     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1623         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1624
1625     pa_assert(s->peek_data);
1626     pa_memblock_release(s->peek_memchunk.memblock);
1627     pa_memblock_unref(s->peek_memchunk.memblock);
1628     pa_memchunk_reset(&s->peek_memchunk);
1629
1630     return 0;
1631 }
1632
1633 size_t pa_stream_writable_size(pa_stream *s) {
1634     pa_assert(s);
1635     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1636
1637     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1638     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1639     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1640
1641     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1642 }
1643
1644 size_t pa_stream_readable_size(pa_stream *s) {
1645     pa_assert(s);
1646     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1647
1648     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1649     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1650     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1651
1652     return pa_memblockq_get_length(s->record_memblockq);
1653 }
1654
1655 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1656     pa_operation *o;
1657     pa_tagstruct *t;
1658     uint32_t tag;
1659
1660     pa_assert(s);
1661     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1662
1663     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1664     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1665     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1666
1667     /* Ask for a timing update before we cork/uncork to get the best
1668      * accuracy for the transport latency suitable for the
1669      * check_smoother_status() call in the started callback */
1670     request_auto_timing_update(s, TRUE);
1671
1672     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1673
1674     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1675     pa_tagstruct_putu32(t, s->channel);
1676     pa_pstream_send_tagstruct(s->context->pstream, t);
1677     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1678
1679     /* This might cause the read index to continue again, hence
1680      * let's request a timing update */
1681     request_auto_timing_update(s, TRUE);
1682
1683     return o;
1684 }
1685
1686 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1687     pa_usec_t usec;
1688
1689     pa_assert(s);
1690     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1691     pa_assert(s->state == PA_STREAM_READY);
1692     pa_assert(s->direction != PA_STREAM_UPLOAD);
1693     pa_assert(s->timing_info_valid);
1694     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1695     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1696
1697     if (s->direction == PA_STREAM_PLAYBACK) {
1698         /* The last byte that was written into the output device
1699          * had this time value associated */
1700         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1701
1702         if (!s->corked && !s->suspended) {
1703
1704             if (!ignore_transport)
1705                 /* Because the latency info took a little time to come
1706                  * to us, we assume that the real output time is actually
1707                  * a little ahead */
1708                 usec += s->timing_info.transport_usec;
1709
1710             /* However, the output device usually maintains a buffer
1711                too, hence the real sample currently played is a little
1712                back  */
1713             if (s->timing_info.sink_usec >= usec)
1714                 usec = 0;
1715             else
1716                 usec -= s->timing_info.sink_usec;
1717         }
1718
1719     } else {
1720         pa_assert(s->direction == PA_STREAM_RECORD);
1721
1722         /* The last byte written into the server side queue had
1723          * this time value associated */
1724         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1725
1726         if (!s->corked && !s->suspended) {
1727
1728             if (!ignore_transport)
1729                 /* Add transport latency */
1730                 usec += s->timing_info.transport_usec;
1731
1732             /* Add latency of data in device buffer */
1733             usec += s->timing_info.source_usec;
1734
1735             /* If this is a monitor source, we need to correct the
1736              * time by the playback device buffer */
1737             if (s->timing_info.sink_usec >= usec)
1738                 usec = 0;
1739             else
1740                 usec -= s->timing_info.sink_usec;
1741         }
1742     }
1743
1744     return usec;
1745 }
1746
1747 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1748     pa_operation *o = userdata;
1749     struct timeval local, remote, now;
1750     pa_timing_info *i;
1751     pa_bool_t playing = FALSE;
1752     uint64_t underrun_for = 0, playing_for = 0;
1753
1754     pa_assert(pd);
1755     pa_assert(o);
1756     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1757
1758     if (!o->context || !o->stream)
1759         goto finish;
1760
1761     i = &o->stream->timing_info;
1762
1763     o->stream->timing_info_valid = FALSE;
1764     i->write_index_corrupt = TRUE;
1765     i->read_index_corrupt = TRUE;
1766
1767     if (command != PA_COMMAND_REPLY) {
1768         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1769             goto finish;
1770
1771     } else {
1772
1773         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1774             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1775             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1776             pa_tagstruct_get_timeval(t, &local) < 0 ||
1777             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1778             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1779             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1780
1781             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1782             goto finish;
1783         }
1784
1785         if (o->context->version >= 13 &&
1786             o->stream->direction == PA_STREAM_PLAYBACK)
1787             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1788                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1789
1790                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1791                 goto finish;
1792             }
1793
1794
1795         if (!pa_tagstruct_eof(t)) {
1796             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1797             goto finish;
1798         }
1799         o->stream->timing_info_valid = TRUE;
1800         i->write_index_corrupt = FALSE;
1801         i->read_index_corrupt = FALSE;
1802
1803         i->playing = (int) playing;
1804         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1805
1806         pa_gettimeofday(&now);
1807
1808         /* Calculate timestamps */
1809         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1810             /* local and remote seem to have synchronized clocks */
1811
1812             if (o->stream->direction == PA_STREAM_PLAYBACK)
1813                 i->transport_usec = pa_timeval_diff(&remote, &local);
1814             else
1815                 i->transport_usec = pa_timeval_diff(&now, &remote);
1816
1817             i->synchronized_clocks = TRUE;
1818             i->timestamp = remote;
1819         } else {
1820             /* clocks are not synchronized, let's estimate latency then */
1821             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1822             i->synchronized_clocks = FALSE;
1823             i->timestamp = local;
1824             pa_timeval_add(&i->timestamp, i->transport_usec);
1825         }
1826
1827         /* Invalidate read and write indexes if necessary */
1828         if (tag < o->stream->read_index_not_before)
1829             i->read_index_corrupt = TRUE;
1830
1831         if (tag < o->stream->write_index_not_before)
1832             i->write_index_corrupt = TRUE;
1833
1834         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1835             /* Write index correction */
1836
1837             int n, j;
1838             uint32_t ctag = tag;
1839
1840             /* Go through the saved correction values and add up the
1841              * total correction.*/
1842             for (n = 0, j = o->stream->current_write_index_correction+1;
1843                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1844                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1845
1846                 /* Step over invalid data or out-of-date data */
1847                 if (!o->stream->write_index_corrections[j].valid ||
1848                     o->stream->write_index_corrections[j].tag < ctag)
1849                     continue;
1850
1851                 /* Make sure that everything is in order */
1852                 ctag = o->stream->write_index_corrections[j].tag+1;
1853
1854                 /* Now fix the write index */
1855                 if (o->stream->write_index_corrections[j].corrupt) {
1856                     /* A corrupting seek was made */
1857                     i->write_index_corrupt = TRUE;
1858                 } else if (o->stream->write_index_corrections[j].absolute) {
1859                     /* An absolute seek was made */
1860                     i->write_index = o->stream->write_index_corrections[j].value;
1861                     i->write_index_corrupt = FALSE;
1862                 } else if (!i->write_index_corrupt) {
1863                     /* A relative seek was made */
1864                     i->write_index += o->stream->write_index_corrections[j].value;
1865                 }
1866             }
1867
1868             /* Clear old correction entries */
1869             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1870                 if (!o->stream->write_index_corrections[n].valid)
1871                     continue;
1872
1873                 if (o->stream->write_index_corrections[n].tag <= tag)
1874                     o->stream->write_index_corrections[n].valid = FALSE;
1875             }
1876         }
1877
1878         if (o->stream->direction == PA_STREAM_RECORD) {
1879             /* Read index correction */
1880
1881             if (!i->read_index_corrupt)
1882                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1883         }
1884
1885         /* Update smoother if we're not corked */
1886         if (o->stream->smoother && !o->stream->corked) {
1887             pa_usec_t u, x;
1888
1889             u = x = pa_rtclock_now() - i->transport_usec;
1890
1891             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1892                 pa_usec_t su;
1893
1894                 /* If we weren't playing then it will take some time
1895                  * until the audio will actually come out through the
1896                  * speakers. Since we follow that timing here, we need
1897                  * to try to fix this up */
1898
1899                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1900
1901                 if (su < i->sink_usec)
1902                     x += i->sink_usec - su;
1903             }
1904
1905             if (!i->playing)
1906                 pa_smoother_pause(o->stream->smoother, x);
1907
1908             /* Update the smoother */
1909             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1910                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1911                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1912
1913             if (i->playing)
1914                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1915         }
1916     }
1917
1918     o->stream->auto_timing_update_requested = FALSE;
1919
1920     if (o->stream->latency_update_callback)
1921         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1922
1923     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1924         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1925         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1926     }
1927
1928 finish:
1929
1930     pa_operation_done(o);
1931     pa_operation_unref(o);
1932 }
1933
1934 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1935     uint32_t tag;
1936     pa_operation *o;
1937     pa_tagstruct *t;
1938     struct timeval now;
1939     int cidx = 0;
1940
1941     pa_assert(s);
1942     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1943
1944     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1945     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1946     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1947
1948     if (s->direction == PA_STREAM_PLAYBACK) {
1949         /* Find a place to store the write_index correction data for this entry */
1950         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1951
1952         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1953         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1954     }
1955     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1956
1957     t = pa_tagstruct_command(
1958             s->context,
1959             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1960             &tag);
1961     pa_tagstruct_putu32(t, s->channel);
1962     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1963
1964     pa_pstream_send_tagstruct(s->context->pstream, t);
1965     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1966
1967     if (s->direction == PA_STREAM_PLAYBACK) {
1968         /* Fill in initial correction data */
1969
1970         s->current_write_index_correction = cidx;
1971
1972         s->write_index_corrections[cidx].valid = TRUE;
1973         s->write_index_corrections[cidx].absolute = FALSE;
1974         s->write_index_corrections[cidx].corrupt = FALSE;
1975         s->write_index_corrections[cidx].tag = tag;
1976         s->write_index_corrections[cidx].value = 0;
1977     }
1978
1979     return o;
1980 }
1981
1982 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1983     pa_stream *s = userdata;
1984
1985     pa_assert(pd);
1986     pa_assert(s);
1987     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1988
1989     pa_stream_ref(s);
1990
1991     if (command != PA_COMMAND_REPLY) {
1992         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1993             goto finish;
1994
1995         pa_stream_set_state(s, PA_STREAM_FAILED);
1996         goto finish;
1997     } else if (!pa_tagstruct_eof(t)) {
1998         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1999         goto finish;
2000     }
2001
2002     pa_stream_set_state(s, PA_STREAM_TERMINATED);
2003
2004 finish:
2005     pa_stream_unref(s);
2006 }
2007
2008 int pa_stream_disconnect(pa_stream *s) {
2009     pa_tagstruct *t;
2010     uint32_t tag;
2011
2012     pa_assert(s);
2013     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2014
2015     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2016     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2017     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2018
2019     pa_stream_ref(s);
2020
2021     t = pa_tagstruct_command(
2022             s->context,
2023             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2024                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2025             &tag);
2026     pa_tagstruct_putu32(t, s->channel);
2027     pa_pstream_send_tagstruct(s->context->pstream, t);
2028     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2029
2030     pa_stream_unref(s);
2031     return 0;
2032 }
2033
2034 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2035     pa_assert(s);
2036     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2037
2038     if (pa_detect_fork())
2039         return;
2040
2041     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2042         return;
2043
2044     s->read_callback = cb;
2045     s->read_userdata = userdata;
2046 }
2047
2048 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2049     pa_assert(s);
2050     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2051
2052     if (pa_detect_fork())
2053         return;
2054
2055     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2056         return;
2057
2058     s->write_callback = cb;
2059     s->write_userdata = userdata;
2060 }
2061
2062 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2063     pa_assert(s);
2064     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2065
2066     if (pa_detect_fork())
2067         return;
2068
2069     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2070         return;
2071
2072     s->state_callback = cb;
2073     s->state_userdata = userdata;
2074 }
2075
2076 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2077     pa_assert(s);
2078     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2079
2080     if (pa_detect_fork())
2081         return;
2082
2083     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2084         return;
2085
2086     s->overflow_callback = cb;
2087     s->overflow_userdata = userdata;
2088 }
2089
2090 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2091     pa_assert(s);
2092     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2093
2094     if (pa_detect_fork())
2095         return;
2096
2097     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2098         return;
2099
2100     s->underflow_callback = cb;
2101     s->underflow_userdata = userdata;
2102 }
2103
2104 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2105     pa_assert(s);
2106     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2107
2108     if (pa_detect_fork())
2109         return;
2110
2111     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2112         return;
2113
2114     s->latency_update_callback = cb;
2115     s->latency_update_userdata = userdata;
2116 }
2117
2118 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2119     pa_assert(s);
2120     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2121
2122     if (pa_detect_fork())
2123         return;
2124
2125     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2126         return;
2127
2128     s->moved_callback = cb;
2129     s->moved_userdata = userdata;
2130 }
2131
2132 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2133     pa_assert(s);
2134     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2135
2136     if (pa_detect_fork())
2137         return;
2138
2139     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2140         return;
2141
2142     s->suspended_callback = cb;
2143     s->suspended_userdata = userdata;
2144 }
2145
2146 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2147     pa_assert(s);
2148     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2149
2150     if (pa_detect_fork())
2151         return;
2152
2153     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2154         return;
2155
2156     s->started_callback = cb;
2157     s->started_userdata = userdata;
2158 }
2159
2160 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2161     pa_assert(s);
2162     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2163
2164     if (pa_detect_fork())
2165         return;
2166
2167     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2168         return;
2169
2170     s->event_callback = cb;
2171     s->event_userdata = userdata;
2172 }
2173
2174 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2175     pa_assert(s);
2176     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2177
2178     if (pa_detect_fork())
2179         return;
2180
2181     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2182         return;
2183
2184     s->buffer_attr_callback = cb;
2185     s->buffer_attr_userdata = userdata;
2186 }
2187
2188 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2189     pa_operation *o = userdata;
2190     int success = 1;
2191
2192     pa_assert(pd);
2193     pa_assert(o);
2194     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2195
2196     if (!o->context)
2197         goto finish;
2198
2199     if (command != PA_COMMAND_REPLY) {
2200         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2201             goto finish;
2202
2203         success = 0;
2204     } else if (!pa_tagstruct_eof(t)) {
2205         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2206         goto finish;
2207     }
2208
2209     if (o->callback) {
2210         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2211         cb(o->stream, success, o->userdata);
2212     }
2213
2214 finish:
2215     pa_operation_done(o);
2216     pa_operation_unref(o);
2217 }
2218
2219 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2220     pa_operation *o;
2221     pa_tagstruct *t;
2222     uint32_t tag;
2223
2224     pa_assert(s);
2225     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2226
2227     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2228     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2229     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2230
2231     /* Ask for a timing update before we cork/uncork to get the best
2232      * accuracy for the transport latency suitable for the
2233      * check_smoother_status() call in the started callback */
2234     request_auto_timing_update(s, TRUE);
2235
2236     s->corked = b;
2237
2238     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2239
2240     t = pa_tagstruct_command(
2241             s->context,
2242             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2243             &tag);
2244     pa_tagstruct_putu32(t, s->channel);
2245     pa_tagstruct_put_boolean(t, !!b);
2246     pa_pstream_send_tagstruct(s->context->pstream, t);
2247     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2248
2249     check_smoother_status(s, FALSE, FALSE, FALSE);
2250
2251     /* This might cause the indexes to hang/start again, hence let's
2252      * request a timing update, after the cork/uncork, too */
2253     request_auto_timing_update(s, TRUE);
2254
2255     return o;
2256 }
2257
2258 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2259     pa_tagstruct *t;
2260     pa_operation *o;
2261     uint32_t tag;
2262
2263     pa_assert(s);
2264     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2265
2266     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2267     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2268
2269     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2270
2271     t = pa_tagstruct_command(s->context, command, &tag);
2272     pa_tagstruct_putu32(t, s->channel);
2273     pa_pstream_send_tagstruct(s->context->pstream, t);
2274     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2275
2276     return o;
2277 }
2278
2279 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2280     pa_operation *o;
2281
2282     pa_assert(s);
2283     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2284
2285     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2286     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2287     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2288
2289     /* Ask for a timing update *before* the flush, so that the
2290      * transport usec is as up to date as possible when we get the
2291      * underflow message and update the smoother status*/
2292     request_auto_timing_update(s, TRUE);
2293
2294     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2295         return NULL;
2296
2297     if (s->direction == PA_STREAM_PLAYBACK) {
2298
2299         if (s->write_index_corrections[s->current_write_index_correction].valid)
2300             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2301
2302         if (s->buffer_attr.prebuf > 0)
2303             check_smoother_status(s, FALSE, FALSE, TRUE);
2304
2305         /* This will change the write index, but leave the
2306          * read index untouched. */
2307         invalidate_indexes(s, FALSE, TRUE);
2308
2309     } else
2310         /* For record streams this has no influence on the write
2311          * index, but the read index might jump. */
2312         invalidate_indexes(s, TRUE, FALSE);
2313
2314     /* Note that we do not update requested_bytes here. This is
2315      * because we cannot really know how data actually was dropped
2316      * from the write index due to this. This 'error' will be applied
2317      * by both client and server and hence we should be fine. */
2318
2319     return o;
2320 }
2321
2322 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2323     pa_operation *o;
2324
2325     pa_assert(s);
2326     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2327
2328     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2329     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2330     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2331     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2332
2333     /* Ask for a timing update before we cork/uncork to get the best
2334      * accuracy for the transport latency suitable for the
2335      * check_smoother_status() call in the started callback */
2336     request_auto_timing_update(s, TRUE);
2337
2338     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2339         return NULL;
2340
2341     /* This might cause the read index to hang again, hence
2342      * let's request a timing update */
2343     request_auto_timing_update(s, TRUE);
2344
2345     return o;
2346 }
2347
2348 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2349     pa_operation *o;
2350
2351     pa_assert(s);
2352     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2353
2354     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2355     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2356     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2357     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2358
2359     /* Ask for a timing update before we cork/uncork to get the best
2360      * accuracy for the transport latency suitable for the
2361      * check_smoother_status() call in the started callback */
2362     request_auto_timing_update(s, TRUE);
2363
2364     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2365         return NULL;
2366
2367     /* This might cause the read index to start moving again, hence
2368      * let's request a timing update */
2369     request_auto_timing_update(s, TRUE);
2370
2371     return o;
2372 }
2373
2374 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2375     pa_operation *o;
2376
2377     pa_assert(s);
2378     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2379     pa_assert(name);
2380
2381     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2382     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2383     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2384
2385     if (s->context->version >= 13) {
2386         pa_proplist *p = pa_proplist_new();
2387
2388         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2389         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2390         pa_proplist_free(p);
2391     } else {
2392         pa_tagstruct *t;
2393         uint32_t tag;
2394
2395         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2396         t = pa_tagstruct_command(
2397                 s->context,
2398                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2399                 &tag);
2400         pa_tagstruct_putu32(t, s->channel);
2401         pa_tagstruct_puts(t, name);
2402         pa_pstream_send_tagstruct(s->context->pstream, t);
2403         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2404     }
2405
2406     return o;
2407 }
2408
2409 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2410     pa_usec_t usec;
2411
2412     pa_assert(s);
2413     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2414
2415     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2416     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2417     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2418     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2419     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2420     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2421
2422     if (s->smoother)
2423         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2424     else
2425         usec = calc_time(s, FALSE);
2426
2427     /* Make sure the time runs monotonically */
2428     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2429         if (usec < s->previous_time)
2430             usec = s->previous_time;
2431         else
2432             s->previous_time = usec;
2433     }
2434
2435     if (r_usec)
2436         *r_usec = usec;
2437
2438     return 0;
2439 }
2440
2441 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2442     pa_assert(s);
2443     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2444
2445     if (negative)
2446         *negative = 0;
2447
2448     if (a >= b)
2449         return a-b;
2450     else {
2451         if (negative && s->direction == PA_STREAM_RECORD) {
2452             *negative = 1;
2453             return b-a;
2454         } else
2455             return 0;
2456     }
2457 }
2458
2459 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2460     pa_usec_t t, c;
2461     int r;
2462     int64_t cindex;
2463
2464     pa_assert(s);
2465     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2466     pa_assert(r_usec);
2467
2468     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2469     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2470     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2471     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2472     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2473     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2474
2475     if ((r = pa_stream_get_time(s, &t)) < 0)
2476         return r;
2477
2478     if (s->direction == PA_STREAM_PLAYBACK)
2479         cindex = s->timing_info.write_index;
2480     else
2481         cindex = s->timing_info.read_index;
2482
2483     if (cindex < 0)
2484         cindex = 0;
2485
2486     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2487
2488     if (s->direction == PA_STREAM_PLAYBACK)
2489         *r_usec = time_counter_diff(s, c, t, negative);
2490     else
2491         *r_usec = time_counter_diff(s, t, c, negative);
2492
2493     return 0;
2494 }
2495
2496 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2497     pa_assert(s);
2498     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2499
2500     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2501     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2502     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2503     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2504
2505     return &s->timing_info;
2506 }
2507
2508 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2509     pa_assert(s);
2510     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2511
2512     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2513
2514     return &s->sample_spec;
2515 }
2516
2517 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2518     pa_assert(s);
2519     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2520
2521     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2522
2523     return &s->channel_map;
2524 }
2525
2526 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2527     pa_assert(s);
2528     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2529
2530     /* We don't have the format till routing is done */
2531     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2532     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2533
2534     return s->format;
2535 }
2536 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2537     pa_assert(s);
2538     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2539
2540     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2541     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2542     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2543
2544     return &s->buffer_attr;
2545 }
2546
2547 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2548     pa_operation *o = userdata;
2549     int success = 1;
2550
2551     pa_assert(pd);
2552     pa_assert(o);
2553     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2554
2555     if (!o->context)
2556         goto finish;
2557
2558     if (command != PA_COMMAND_REPLY) {
2559         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2560             goto finish;
2561
2562         success = 0;
2563     } else {
2564         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2565             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2566                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2567                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2568                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2569                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2570                 goto finish;
2571             }
2572         } else if (o->stream->direction == PA_STREAM_RECORD) {
2573             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2574                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2575                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2576                 goto finish;
2577             }
2578         }
2579
2580         if (o->stream->context->version >= 13) {
2581             pa_usec_t usec;
2582
2583             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2584                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2585                 goto finish;
2586             }
2587
2588             if (o->stream->direction == PA_STREAM_RECORD)
2589                 o->stream->timing_info.configured_source_usec = usec;
2590             else
2591                 o->stream->timing_info.configured_sink_usec = usec;
2592         }
2593
2594         if (!pa_tagstruct_eof(t)) {
2595             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2596             goto finish;
2597         }
2598     }
2599
2600     if (o->callback) {
2601         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2602         cb(o->stream, success, o->userdata);
2603     }
2604
2605 finish:
2606     pa_operation_done(o);
2607     pa_operation_unref(o);
2608 }
2609
2610
2611 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2612     pa_operation *o;
2613     pa_tagstruct *t;
2614     uint32_t tag;
2615     pa_buffer_attr copy;
2616
2617     pa_assert(s);
2618     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2619     pa_assert(attr);
2620
2621     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2622     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2623     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2624     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2625
2626     /* Ask for a timing update before we cork/uncork to get the best
2627      * accuracy for the transport latency suitable for the
2628      * check_smoother_status() call in the started callback */
2629     request_auto_timing_update(s, TRUE);
2630
2631     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2632
2633     t = pa_tagstruct_command(
2634             s->context,
2635             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2636             &tag);
2637     pa_tagstruct_putu32(t, s->channel);
2638
2639     copy = *attr;
2640     patch_buffer_attr(s, &copy, NULL);
2641     attr = &copy;
2642
2643     pa_tagstruct_putu32(t, attr->maxlength);
2644
2645     if (s->direction == PA_STREAM_PLAYBACK)
2646         pa_tagstruct_put(
2647                 t,
2648                 PA_TAG_U32, attr->tlength,
2649                 PA_TAG_U32, attr->prebuf,
2650                 PA_TAG_U32, attr->minreq,
2651                 PA_TAG_INVALID);
2652     else
2653         pa_tagstruct_putu32(t, attr->fragsize);
2654
2655     if (s->context->version >= 13)
2656         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2657
2658     if (s->context->version >= 14)
2659         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2660
2661     pa_pstream_send_tagstruct(s->context->pstream, t);
2662     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2663
2664     /* This might cause changes in the read/write index, hence let's
2665      * request a timing update */
2666     request_auto_timing_update(s, TRUE);
2667
2668     return o;
2669 }
2670
2671 uint32_t pa_stream_get_device_index(pa_stream *s) {
2672     pa_assert(s);
2673     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2674
2675     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2676     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2677     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2678     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2679     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2680
2681     return s->device_index;
2682 }
2683
2684 const char *pa_stream_get_device_name(pa_stream *s) {
2685     pa_assert(s);
2686     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2687
2688     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2689     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2690     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2691     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2692     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2693
2694     return s->device_name;
2695 }
2696
2697 int pa_stream_is_suspended(pa_stream *s) {
2698     pa_assert(s);
2699     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2700
2701     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2702     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2703     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2704     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2705
2706     return s->suspended;
2707 }
2708
2709 int pa_stream_is_corked(pa_stream *s) {
2710     pa_assert(s);
2711     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2712
2713     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2714     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2715     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2716
2717     return s->corked;
2718 }
2719
2720 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2721     pa_operation *o = userdata;
2722     int success = 1;
2723
2724     pa_assert(pd);
2725     pa_assert(o);
2726     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2727
2728     if (!o->context)
2729         goto finish;
2730
2731     if (command != PA_COMMAND_REPLY) {
2732         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2733             goto finish;
2734
2735         success = 0;
2736     } else {
2737
2738         if (!pa_tagstruct_eof(t)) {
2739             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2740             goto finish;
2741         }
2742     }
2743
2744     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2745     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2746
2747     if (o->callback) {
2748         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2749         cb(o->stream, success, o->userdata);
2750     }
2751
2752 finish:
2753     pa_operation_done(o);
2754     pa_operation_unref(o);
2755 }
2756
2757
2758 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2759     pa_operation *o;
2760     pa_tagstruct *t;
2761     uint32_t tag;
2762
2763     pa_assert(s);
2764     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2765
2766     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2767     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2768     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2769     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2770     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2771     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2772
2773     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2774     o->private = PA_UINT_TO_PTR(rate);
2775
2776     t = pa_tagstruct_command(
2777             s->context,
2778             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2779             &tag);
2780     pa_tagstruct_putu32(t, s->channel);
2781     pa_tagstruct_putu32(t, rate);
2782
2783     pa_pstream_send_tagstruct(s->context->pstream, t);
2784     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2785
2786     return o;
2787 }
2788
2789 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2790     pa_operation *o;
2791     pa_tagstruct *t;
2792     uint32_t tag;
2793
2794     pa_assert(s);
2795     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2796
2797     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2798     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2799     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2800     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2801     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2802
2803     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2804
2805     t = pa_tagstruct_command(
2806             s->context,
2807             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2808             &tag);
2809     pa_tagstruct_putu32(t, s->channel);
2810     pa_tagstruct_putu32(t, (uint32_t) mode);
2811     pa_tagstruct_put_proplist(t, p);
2812
2813     pa_pstream_send_tagstruct(s->context->pstream, t);
2814     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2815
2816     /* Please note that we don't update s->proplist here, because we
2817      * don't export that field */
2818
2819     return o;
2820 }
2821
2822 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2823     pa_operation *o;
2824     pa_tagstruct *t;
2825     uint32_t tag;
2826     const char * const*k;
2827
2828     pa_assert(s);
2829     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2830
2831     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2832     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2833     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2834     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2835     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2836
2837     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2838
2839     t = pa_tagstruct_command(
2840             s->context,
2841             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2842             &tag);
2843     pa_tagstruct_putu32(t, s->channel);
2844
2845     for (k = keys; *k; k++)
2846         pa_tagstruct_puts(t, *k);
2847
2848     pa_tagstruct_puts(t, NULL);
2849
2850     pa_pstream_send_tagstruct(s->context->pstream, t);
2851     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2852
2853     /* Please note that we don't update s->proplist here, because we
2854      * don't export that field */
2855
2856     return o;
2857 }
2858
2859 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2860     pa_assert(s);
2861     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2862
2863     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2864     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2865     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2866     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2867
2868     s->direct_on_input = sink_input_idx;
2869
2870     return 0;
2871 }
2872
2873 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2874     pa_assert(s);
2875     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2876
2877     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2878     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2879     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2880
2881     return s->direct_on_input;
2882 }