More spelling fixes
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_format_info * const *formats,
89         unsigned int n_formats,
90         pa_proplist *p) {
91
92     pa_stream *s;
93     unsigned int i;
94
95     pa_assert(c);
96     pa_assert(PA_REFCNT_VALUE(c) >= 1);
97     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98     pa_assert(n_formats < PA_MAX_FORMATS);
99
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     s = pa_xnew(pa_stream, 1);
104     PA_REFCNT_INIT(s);
105     s->context = c;
106     s->mainloop = c->mainloop;
107
108     s->direction = PA_STREAM_NODIRECTION;
109     s->state = PA_STREAM_UNCONNECTED;
110     s->flags = 0;
111
112     if (ss)
113         s->sample_spec = *ss;
114     else
115         s->sample_spec.format = PA_SAMPLE_INVALID;
116
117     if (map)
118         s->channel_map = *map;
119     else
120         pa_channel_map_init(&s->channel_map);
121
122     s->n_formats = 0;
123     if (formats) {
124         s->n_formats = n_formats;
125         for (i = 0; i < n_formats; i++)
126             s->req_formats[i] = pa_format_info_copy(formats[i]);
127     }
128
129     /* We'll get the final negotiated format after connecting */
130     s->format = NULL;
131
132     s->direct_on_input = PA_INVALID_INDEX;
133
134     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135     if (name)
136         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138     s->channel = 0;
139     s->channel_valid = FALSE;
140     s->syncid = c->csyncid++;
141     s->stream_index = PA_INVALID_INDEX;
142
143     s->requested_bytes = 0;
144     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146     /* We initialize der target length here, so that if the user
147      * passes no explicit buffering metrics the default is similar to
148      * what older PA versions provided. */
149
150     s->buffer_attr.maxlength = (uint32_t) -1;
151     if (ss)
152         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153     else {
154         /* FIXME: We assume a worst-case compressed format corresponding to
155          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156         pa_sample_spec tmp_ss = {
157             .format   = PA_SAMPLE_S16NE,
158             .rate     = 48000,
159             .channels = 2,
160         };
161         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162     }
163     s->buffer_attr.minreq = (uint32_t) -1;
164     s->buffer_attr.prebuf = (uint32_t) -1;
165     s->buffer_attr.fragsize = (uint32_t) -1;
166
167     s->device_index = PA_INVALID_INDEX;
168     s->device_name = NULL;
169     s->suspended = FALSE;
170     s->corked = FALSE;
171
172     s->write_memblock = NULL;
173     s->write_data = NULL;
174
175     pa_memchunk_reset(&s->peek_memchunk);
176     s->peek_data = NULL;
177     s->record_memblockq = NULL;
178
179     memset(&s->timing_info, 0, sizeof(s->timing_info));
180     s->timing_info_valid = FALSE;
181
182     s->previous_time = 0;
183     s->latest_underrun_at_index = -1;
184
185     s->read_index_not_before = 0;
186     s->write_index_not_before = 0;
187     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188         s->write_index_corrections[i].valid = 0;
189     s->current_write_index_correction = 0;
190
191     s->auto_timing_update_event = NULL;
192     s->auto_timing_update_requested = FALSE;
193     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195     reset_callbacks(s);
196
197     s->smoother = NULL;
198
199     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200     PA_LLIST_PREPEND(pa_stream, c->streams, s);
201     pa_stream_ref(s);
202
203     return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207         pa_context *c,
208         const char *name,
209         const pa_sample_spec *ss,
210         const pa_channel_map *map,
211         pa_proplist *p) {
212
213     pa_channel_map tmap;
214
215     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221     if (!map)
222         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228         pa_context *c,
229         const char *name,
230         pa_format_info * const *formats,
231         unsigned int n_formats,
232         pa_proplist *p) {
233
234     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235
236     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
237 }
238
239 static void stream_unlink(pa_stream *s) {
240     pa_operation *o, *n;
241     pa_assert(s);
242
243     if (!s->context)
244         return;
245
246     /* Detach from context */
247
248     /* Unref all operation object that point to us */
249     for (o = s->context->operations; o; o = n) {
250         n = o->next;
251
252         if (o->stream == s)
253             pa_operation_cancel(o);
254     }
255
256     /* Drop all outstanding replies for this stream */
257     if (s->context->pdispatch)
258         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259
260     if (s->channel_valid) {
261         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
262         s->channel = 0;
263         s->channel_valid = FALSE;
264     }
265
266     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
267     pa_stream_unref(s);
268
269     s->context = NULL;
270
271     if (s->auto_timing_update_event) {
272         pa_assert(s->mainloop);
273         s->mainloop->time_free(s->auto_timing_update_event);
274     }
275
276     reset_callbacks(s);
277 }
278
279 static void stream_free(pa_stream *s) {
280     unsigned int i;
281
282     pa_assert(s);
283
284     stream_unlink(s);
285
286     if (s->write_memblock) {
287         pa_memblock_release(s->write_memblock);
288         pa_memblock_unref(s->write_data);
289     }
290
291     if (s->peek_memchunk.memblock) {
292         if (s->peek_data)
293             pa_memblock_release(s->peek_memchunk.memblock);
294         pa_memblock_unref(s->peek_memchunk.memblock);
295     }
296
297     if (s->record_memblockq)
298         pa_memblockq_free(s->record_memblockq);
299
300     if (s->proplist)
301         pa_proplist_free(s->proplist);
302
303     if (s->smoother)
304         pa_smoother_free(s->smoother);
305
306     for (i = 0; i < s->n_formats; i++)
307         pa_format_info_free(s->req_formats[i]);
308
309     if (s->format)
310         pa_format_info_free(s->format);
311
312     pa_xfree(s->device_name);
313     pa_xfree(s);
314 }
315
316 void pa_stream_unref(pa_stream *s) {
317     pa_assert(s);
318     pa_assert(PA_REFCNT_VALUE(s) >= 1);
319
320     if (PA_REFCNT_DEC(s) <= 0)
321         stream_free(s);
322 }
323
324 pa_stream* pa_stream_ref(pa_stream *s) {
325     pa_assert(s);
326     pa_assert(PA_REFCNT_VALUE(s) >= 1);
327
328     PA_REFCNT_INC(s);
329     return s;
330 }
331
332 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
333     pa_assert(s);
334     pa_assert(PA_REFCNT_VALUE(s) >= 1);
335
336     return s->state;
337 }
338
339 pa_context* pa_stream_get_context(pa_stream *s) {
340     pa_assert(s);
341     pa_assert(PA_REFCNT_VALUE(s) >= 1);
342
343     return s->context;
344 }
345
346 uint32_t pa_stream_get_index(pa_stream *s) {
347     pa_assert(s);
348     pa_assert(PA_REFCNT_VALUE(s) >= 1);
349
350     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
351     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
352
353     return s->stream_index;
354 }
355
356 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
357     pa_assert(s);
358     pa_assert(PA_REFCNT_VALUE(s) >= 1);
359
360     if (s->state == st)
361         return;
362
363     pa_stream_ref(s);
364
365     s->state = st;
366
367     if (s->state_callback)
368         s->state_callback(s, s->state_userdata);
369
370     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
371         stream_unlink(s);
372
373     pa_stream_unref(s);
374 }
375
376 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
377     pa_assert(s);
378     pa_assert(PA_REFCNT_VALUE(s) >= 1);
379
380     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
381         return;
382
383     if (s->state == PA_STREAM_READY &&
384         (force || !s->auto_timing_update_requested)) {
385         pa_operation *o;
386
387 /*         pa_log("Automatically requesting new timing data"); */
388
389         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
390             pa_operation_unref(o);
391             s->auto_timing_update_requested = TRUE;
392         }
393     }
394
395     if (s->auto_timing_update_event) {
396         if (s->suspended && !force) {
397             pa_assert(s->mainloop);
398             s->mainloop->time_free(s->auto_timing_update_event);
399             s->auto_timing_update_event = NULL;
400         } else {
401             if (force)
402                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
403
404             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
405
406             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
407         }
408     }
409 }
410
411 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
412     pa_context *c = userdata;
413     pa_stream *s;
414     uint32_t channel;
415
416     pa_assert(pd);
417     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
418     pa_assert(t);
419     pa_assert(c);
420     pa_assert(PA_REFCNT_VALUE(c) >= 1);
421
422     pa_context_ref(c);
423
424     if (pa_tagstruct_getu32(t, &channel) < 0 ||
425         !pa_tagstruct_eof(t)) {
426         pa_context_fail(c, PA_ERR_PROTOCOL);
427         goto finish;
428     }
429
430     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
431         goto finish;
432
433     if (s->state != PA_STREAM_READY)
434         goto finish;
435
436     pa_context_set_error(c, PA_ERR_KILLED);
437     pa_stream_set_state(s, PA_STREAM_FAILED);
438
439 finish:
440     pa_context_unref(c);
441 }
442
443 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
444     pa_usec_t x;
445
446     pa_assert(s);
447     pa_assert(!force_start || !force_stop);
448
449     if (!s->smoother)
450         return;
451
452     x = pa_rtclock_now();
453
454     if (s->timing_info_valid) {
455         if (aposteriori)
456             x -= s->timing_info.transport_usec;
457         else
458             x += s->timing_info.transport_usec;
459     }
460
461     if (s->suspended || s->corked || force_stop)
462         pa_smoother_pause(s->smoother, x);
463     else if (force_start || s->buffer_attr.prebuf == 0) {
464
465         if (!s->timing_info_valid &&
466             !aposteriori &&
467             !force_start &&
468             !force_stop &&
469             s->context->version >= 13) {
470
471             /* If the server supports STARTED events we take them as
472              * indications when audio really starts/stops playing, if
473              * we don't have any timing info yet -- instead of trying
474              * to be smart and guessing the server time. Otherwise the
475              * unknown transport delay add too much noise to our time
476              * calculations. */
477
478             return;
479         }
480
481         pa_smoother_resume(s->smoother, x, TRUE);
482     }
483
484     /* Please note that we have no idea if playback actually started
485      * if prebuf is non-zero! */
486 }
487
488 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
489
490 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
491     pa_context *c = userdata;
492     pa_stream *s;
493     uint32_t channel;
494     const char *dn;
495     pa_bool_t suspended;
496     uint32_t di;
497     pa_usec_t usec = 0;
498     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
499
500     pa_assert(pd);
501     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
502     pa_assert(t);
503     pa_assert(c);
504     pa_assert(PA_REFCNT_VALUE(c) >= 1);
505
506     pa_context_ref(c);
507
508     if (c->version < 12) {
509         pa_context_fail(c, PA_ERR_PROTOCOL);
510         goto finish;
511     }
512
513     if (pa_tagstruct_getu32(t, &channel) < 0 ||
514         pa_tagstruct_getu32(t, &di) < 0 ||
515         pa_tagstruct_gets(t, &dn) < 0 ||
516         pa_tagstruct_get_boolean(t, &suspended) < 0) {
517         pa_context_fail(c, PA_ERR_PROTOCOL);
518         goto finish;
519     }
520
521     if (c->version >= 13) {
522
523         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
524             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
525                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
526                 pa_tagstruct_get_usec(t, &usec) < 0) {
527                 pa_context_fail(c, PA_ERR_PROTOCOL);
528                 goto finish;
529             }
530         } else {
531             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
532                 pa_tagstruct_getu32(t, &tlength) < 0 ||
533                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
534                 pa_tagstruct_getu32(t, &minreq) < 0 ||
535                 pa_tagstruct_get_usec(t, &usec) < 0) {
536                 pa_context_fail(c, PA_ERR_PROTOCOL);
537                 goto finish;
538             }
539         }
540     }
541
542     if (!pa_tagstruct_eof(t)) {
543         pa_context_fail(c, PA_ERR_PROTOCOL);
544         goto finish;
545     }
546
547     if (!dn || di == PA_INVALID_INDEX) {
548         pa_context_fail(c, PA_ERR_PROTOCOL);
549         goto finish;
550     }
551
552     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
553         goto finish;
554
555     if (s->state != PA_STREAM_READY)
556         goto finish;
557
558     if (c->version >= 13) {
559         if (s->direction == PA_STREAM_RECORD)
560             s->timing_info.configured_source_usec = usec;
561         else
562             s->timing_info.configured_sink_usec = usec;
563
564         s->buffer_attr.maxlength = maxlength;
565         s->buffer_attr.fragsize = fragsize;
566         s->buffer_attr.tlength = tlength;
567         s->buffer_attr.prebuf = prebuf;
568         s->buffer_attr.minreq = minreq;
569     }
570
571     pa_xfree(s->device_name);
572     s->device_name = pa_xstrdup(dn);
573     s->device_index = di;
574
575     s->suspended = suspended;
576
577     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
578         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
579         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
580         request_auto_timing_update(s, TRUE);
581     }
582
583     check_smoother_status(s, TRUE, FALSE, FALSE);
584     request_auto_timing_update(s, TRUE);
585
586     if (s->moved_callback)
587         s->moved_callback(s, s->moved_userdata);
588
589 finish:
590     pa_context_unref(c);
591 }
592
593 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
594     pa_context *c = userdata;
595     pa_stream *s;
596     uint32_t channel;
597     pa_usec_t usec = 0;
598     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
599
600     pa_assert(pd);
601     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
602     pa_assert(t);
603     pa_assert(c);
604     pa_assert(PA_REFCNT_VALUE(c) >= 1);
605
606     pa_context_ref(c);
607
608     if (c->version < 15) {
609         pa_context_fail(c, PA_ERR_PROTOCOL);
610         goto finish;
611     }
612
613     if (pa_tagstruct_getu32(t, &channel) < 0) {
614         pa_context_fail(c, PA_ERR_PROTOCOL);
615         goto finish;
616     }
617
618     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
619         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
620             pa_tagstruct_getu32(t, &fragsize) < 0 ||
621             pa_tagstruct_get_usec(t, &usec) < 0) {
622             pa_context_fail(c, PA_ERR_PROTOCOL);
623             goto finish;
624         }
625     } else {
626         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
627             pa_tagstruct_getu32(t, &tlength) < 0 ||
628             pa_tagstruct_getu32(t, &prebuf) < 0 ||
629             pa_tagstruct_getu32(t, &minreq) < 0 ||
630             pa_tagstruct_get_usec(t, &usec) < 0) {
631             pa_context_fail(c, PA_ERR_PROTOCOL);
632             goto finish;
633         }
634     }
635
636     if (!pa_tagstruct_eof(t)) {
637         pa_context_fail(c, PA_ERR_PROTOCOL);
638         goto finish;
639     }
640
641     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
642         goto finish;
643
644     if (s->state != PA_STREAM_READY)
645         goto finish;
646
647     if (s->direction == PA_STREAM_RECORD)
648         s->timing_info.configured_source_usec = usec;
649     else
650         s->timing_info.configured_sink_usec = usec;
651
652     s->buffer_attr.maxlength = maxlength;
653     s->buffer_attr.fragsize = fragsize;
654     s->buffer_attr.tlength = tlength;
655     s->buffer_attr.prebuf = prebuf;
656     s->buffer_attr.minreq = minreq;
657
658     request_auto_timing_update(s, TRUE);
659
660     if (s->buffer_attr_callback)
661         s->buffer_attr_callback(s, s->buffer_attr_userdata);
662
663 finish:
664     pa_context_unref(c);
665 }
666
667 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
668     pa_context *c = userdata;
669     pa_stream *s;
670     uint32_t channel;
671     pa_bool_t suspended;
672
673     pa_assert(pd);
674     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
675     pa_assert(t);
676     pa_assert(c);
677     pa_assert(PA_REFCNT_VALUE(c) >= 1);
678
679     pa_context_ref(c);
680
681     if (c->version < 12) {
682         pa_context_fail(c, PA_ERR_PROTOCOL);
683         goto finish;
684     }
685
686     if (pa_tagstruct_getu32(t, &channel) < 0 ||
687         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
688         !pa_tagstruct_eof(t)) {
689         pa_context_fail(c, PA_ERR_PROTOCOL);
690         goto finish;
691     }
692
693     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
694         goto finish;
695
696     if (s->state != PA_STREAM_READY)
697         goto finish;
698
699     s->suspended = suspended;
700
701     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
702         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
703         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
704         request_auto_timing_update(s, TRUE);
705     }
706
707     check_smoother_status(s, TRUE, FALSE, FALSE);
708     request_auto_timing_update(s, TRUE);
709
710     if (s->suspended_callback)
711         s->suspended_callback(s, s->suspended_userdata);
712
713 finish:
714     pa_context_unref(c);
715 }
716
717 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
718     pa_context *c = userdata;
719     pa_stream *s;
720     uint32_t channel;
721
722     pa_assert(pd);
723     pa_assert(command == PA_COMMAND_STARTED);
724     pa_assert(t);
725     pa_assert(c);
726     pa_assert(PA_REFCNT_VALUE(c) >= 1);
727
728     pa_context_ref(c);
729
730     if (c->version < 13) {
731         pa_context_fail(c, PA_ERR_PROTOCOL);
732         goto finish;
733     }
734
735     if (pa_tagstruct_getu32(t, &channel) < 0 ||
736         !pa_tagstruct_eof(t)) {
737         pa_context_fail(c, PA_ERR_PROTOCOL);
738         goto finish;
739     }
740
741     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
742         goto finish;
743
744     if (s->state != PA_STREAM_READY)
745         goto finish;
746
747     check_smoother_status(s, TRUE, TRUE, FALSE);
748     request_auto_timing_update(s, TRUE);
749
750     if (s->started_callback)
751         s->started_callback(s, s->started_userdata);
752
753 finish:
754     pa_context_unref(c);
755 }
756
757 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
758     pa_context *c = userdata;
759     pa_stream *s;
760     uint32_t channel;
761     pa_proplist *pl = NULL;
762     const char *event;
763
764     pa_assert(pd);
765     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
766     pa_assert(t);
767     pa_assert(c);
768     pa_assert(PA_REFCNT_VALUE(c) >= 1);
769
770     pa_context_ref(c);
771
772     if (c->version < 15) {
773         pa_context_fail(c, PA_ERR_PROTOCOL);
774         goto finish;
775     }
776
777     pl = pa_proplist_new();
778
779     if (pa_tagstruct_getu32(t, &channel) < 0 ||
780         pa_tagstruct_gets(t, &event) < 0 ||
781         pa_tagstruct_get_proplist(t, pl) < 0 ||
782         !pa_tagstruct_eof(t) || !event) {
783         pa_context_fail(c, PA_ERR_PROTOCOL);
784         goto finish;
785     }
786
787     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
788         goto finish;
789
790     if (s->state != PA_STREAM_READY)
791         goto finish;
792
793     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
794         /* Let client know what the running time was when the stream had to be killed  */
795         pa_usec_t stream_time;
796         if (pa_stream_get_time(s, &stream_time) == 0)
797             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
798     }
799
800     if (s->event_callback)
801         s->event_callback(s, event, pl, s->event_userdata);
802
803 finish:
804     pa_context_unref(c);
805
806     if (pl)
807         pa_proplist_free(pl);
808 }
809
810 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
811     pa_stream *s;
812     pa_context *c = userdata;
813     uint32_t bytes, channel;
814
815     pa_assert(pd);
816     pa_assert(command == PA_COMMAND_REQUEST);
817     pa_assert(t);
818     pa_assert(c);
819     pa_assert(PA_REFCNT_VALUE(c) >= 1);
820
821     pa_context_ref(c);
822
823     if (pa_tagstruct_getu32(t, &channel) < 0 ||
824         pa_tagstruct_getu32(t, &bytes) < 0 ||
825         !pa_tagstruct_eof(t)) {
826         pa_context_fail(c, PA_ERR_PROTOCOL);
827         goto finish;
828     }
829
830     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
831         goto finish;
832
833     if (s->state != PA_STREAM_READY)
834         goto finish;
835
836     s->requested_bytes += bytes;
837
838     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
839
840     if (s->requested_bytes > 0 && s->write_callback)
841         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
842
843 finish:
844     pa_context_unref(c);
845 }
846
847 int64_t pa_stream_get_underflow_index(pa_stream *p)
848 {
849     pa_assert(p);
850     return p->latest_underrun_at_index;
851 }
852
853 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
854     pa_stream *s;
855     pa_context *c = userdata;
856     uint32_t channel;
857     int64_t offset = -1;
858
859     pa_assert(pd);
860     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
861     pa_assert(t);
862     pa_assert(c);
863     pa_assert(PA_REFCNT_VALUE(c) >= 1);
864
865     pa_context_ref(c);
866
867     if (pa_tagstruct_getu32(t, &channel) < 0) {
868         pa_context_fail(c, PA_ERR_PROTOCOL);
869         goto finish;
870     }
871
872     if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
873         if (pa_tagstruct_gets64(t, &offset) < 0) {
874             pa_context_fail(c, PA_ERR_PROTOCOL);
875             goto finish;
876         }
877     }
878
879     if (!pa_tagstruct_eof(t)) {
880         pa_context_fail(c, PA_ERR_PROTOCOL);
881         goto finish;
882     }
883
884     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
885         goto finish;
886
887     if (s->state != PA_STREAM_READY)
888         goto finish;
889
890     if (offset != -1)
891         s->latest_underrun_at_index = offset;
892
893     if (s->buffer_attr.prebuf > 0)
894         check_smoother_status(s, TRUE, FALSE, TRUE);
895
896     request_auto_timing_update(s, TRUE);
897
898     if (command == PA_COMMAND_OVERFLOW) {
899         if (s->overflow_callback)
900             s->overflow_callback(s, s->overflow_userdata);
901     } else if (command == PA_COMMAND_UNDERFLOW) {
902         if (s->underflow_callback)
903             s->underflow_callback(s, s->underflow_userdata);
904     }
905
906 finish:
907     pa_context_unref(c);
908 }
909
910 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
911     pa_assert(s);
912     pa_assert(PA_REFCNT_VALUE(s) >= 1);
913
914 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
915
916     if (s->state != PA_STREAM_READY)
917         return;
918
919     if (w) {
920         s->write_index_not_before = s->context->ctag;
921
922         if (s->timing_info_valid)
923             s->timing_info.write_index_corrupt = TRUE;
924
925 /*         pa_log("write_index invalidated"); */
926     }
927
928     if (r) {
929         s->read_index_not_before = s->context->ctag;
930
931         if (s->timing_info_valid)
932             s->timing_info.read_index_corrupt = TRUE;
933
934 /*         pa_log("read_index invalidated"); */
935     }
936
937     request_auto_timing_update(s, TRUE);
938 }
939
940 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
941     pa_stream *s = userdata;
942
943     pa_assert(s);
944     pa_assert(PA_REFCNT_VALUE(s) >= 1);
945
946     pa_stream_ref(s);
947     request_auto_timing_update(s, FALSE);
948     pa_stream_unref(s);
949 }
950
951 static void create_stream_complete(pa_stream *s) {
952     pa_assert(s);
953     pa_assert(PA_REFCNT_VALUE(s) >= 1);
954     pa_assert(s->state == PA_STREAM_CREATING);
955
956     pa_stream_set_state(s, PA_STREAM_READY);
957
958     if (s->requested_bytes > 0 && s->write_callback)
959         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
960
961     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
962         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
963         pa_assert(!s->auto_timing_update_event);
964         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
965
966         request_auto_timing_update(s, TRUE);
967     }
968
969     check_smoother_status(s, TRUE, FALSE, FALSE);
970 }
971
972 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
973     const char *e;
974
975     pa_assert(s);
976     pa_assert(attr);
977
978     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
979         uint32_t ms;
980
981         if (pa_atou(e, &ms) < 0 || ms <= 0)
982             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
983         else {
984             attr->maxlength = (uint32_t) -1;
985             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
986             attr->minreq = (uint32_t) -1;
987             attr->prebuf = (uint32_t) -1;
988             attr->fragsize = attr->tlength;
989         }
990
991         if (flags)
992             *flags |= PA_STREAM_ADJUST_LATENCY;
993     }
994
995     if (s->context->version >= 13)
996         return;
997
998     /* Version older than 0.9.10 didn't do server side buffer_attr
999      * selection, hence we have to fake it on the client side. */
1000
1001     /* We choose fairly conservative values here, to not confuse
1002      * old clients with extremely large playback buffers */
1003
1004     if (attr->maxlength == (uint32_t) -1)
1005         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1006
1007     if (attr->tlength == (uint32_t) -1)
1008         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1009
1010     if (attr->minreq == (uint32_t) -1)
1011         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1012
1013     if (attr->prebuf == (uint32_t) -1)
1014         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1015
1016     if (attr->fragsize == (uint32_t) -1)
1017         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1018 }
1019
1020 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1021     pa_stream *s = userdata;
1022     uint32_t requested_bytes = 0;
1023
1024     pa_assert(pd);
1025     pa_assert(s);
1026     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1027     pa_assert(s->state == PA_STREAM_CREATING);
1028
1029     pa_stream_ref(s);
1030
1031     if (command != PA_COMMAND_REPLY) {
1032         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1033             goto finish;
1034
1035         pa_stream_set_state(s, PA_STREAM_FAILED);
1036         goto finish;
1037     }
1038
1039     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1040         s->channel == PA_INVALID_INDEX ||
1041         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1042         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1043         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1044         goto finish;
1045     }
1046
1047     s->requested_bytes = (int64_t) requested_bytes;
1048
1049     if (s->context->version >= 9) {
1050         if (s->direction == PA_STREAM_PLAYBACK) {
1051             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1052                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1053                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1054                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1055                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1056                 goto finish;
1057             }
1058         } else if (s->direction == PA_STREAM_RECORD) {
1059             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1060                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1061                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1062                 goto finish;
1063             }
1064         }
1065     }
1066
1067     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1068         pa_sample_spec ss;
1069         pa_channel_map cm;
1070         const char *dn = NULL;
1071         pa_bool_t suspended;
1072
1073         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1074             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1075             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1076             pa_tagstruct_gets(t, &dn) < 0 ||
1077             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1078             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1079             goto finish;
1080         }
1081
1082         if (!dn || s->device_index == PA_INVALID_INDEX ||
1083             ss.channels != cm.channels ||
1084             !pa_channel_map_valid(&cm) ||
1085             !pa_sample_spec_valid(&ss) ||
1086             (s->n_formats == 0 && (
1087                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1088                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1089                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1090             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1091             goto finish;
1092         }
1093
1094         pa_xfree(s->device_name);
1095         s->device_name = pa_xstrdup(dn);
1096         s->suspended = suspended;
1097
1098         s->channel_map = cm;
1099         s->sample_spec = ss;
1100     }
1101
1102     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1103         pa_usec_t usec;
1104
1105         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1106             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1107             goto finish;
1108         }
1109
1110         if (s->direction == PA_STREAM_RECORD)
1111             s->timing_info.configured_source_usec = usec;
1112         else
1113             s->timing_info.configured_sink_usec = usec;
1114     }
1115
1116     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1117         || s->context->version >= 22) {
1118
1119         pa_format_info *f = pa_format_info_new();
1120         pa_tagstruct_get_format_info(t, f);
1121
1122         if (pa_format_info_valid(f))
1123             s->format = f;
1124         else {
1125             pa_format_info_free(f);
1126             if (s->n_formats > 0) {
1127                 /* We used the extended API, so we should have got back a proper format */
1128                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1129                 goto finish;
1130             }
1131         }
1132     }
1133
1134     if (!pa_tagstruct_eof(t)) {
1135         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1136         goto finish;
1137     }
1138
1139     if (s->direction == PA_STREAM_RECORD) {
1140         pa_assert(!s->record_memblockq);
1141
1142         s->record_memblockq = pa_memblockq_new(
1143                 0,
1144                 s->buffer_attr.maxlength,
1145                 0,
1146                 pa_frame_size(&s->sample_spec),
1147                 1,
1148                 0,
1149                 0,
1150                 NULL);
1151     }
1152
1153     s->channel_valid = TRUE;
1154     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1155
1156     create_stream_complete(s);
1157
1158 finish:
1159     pa_stream_unref(s);
1160 }
1161
1162 static int create_stream(
1163         pa_stream_direction_t direction,
1164         pa_stream *s,
1165         const char *dev,
1166         const pa_buffer_attr *attr,
1167         pa_stream_flags_t flags,
1168         const pa_cvolume *volume,
1169         pa_stream *sync_stream) {
1170
1171     pa_tagstruct *t;
1172     uint32_t tag;
1173     pa_bool_t volume_set = !!volume;
1174     pa_cvolume cv;
1175     uint32_t i;
1176
1177     pa_assert(s);
1178     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1179     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1180
1181     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1182     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1183     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1184     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1185                                               PA_STREAM_INTERPOLATE_TIMING|
1186                                               PA_STREAM_NOT_MONOTONIC|
1187                                               PA_STREAM_AUTO_TIMING_UPDATE|
1188                                               PA_STREAM_NO_REMAP_CHANNELS|
1189                                               PA_STREAM_NO_REMIX_CHANNELS|
1190                                               PA_STREAM_FIX_FORMAT|
1191                                               PA_STREAM_FIX_RATE|
1192                                               PA_STREAM_FIX_CHANNELS|
1193                                               PA_STREAM_DONT_MOVE|
1194                                               PA_STREAM_VARIABLE_RATE|
1195                                               PA_STREAM_PEAK_DETECT|
1196                                               PA_STREAM_START_MUTED|
1197                                               PA_STREAM_ADJUST_LATENCY|
1198                                               PA_STREAM_EARLY_REQUESTS|
1199                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1200                                               PA_STREAM_START_UNMUTED|
1201                                               PA_STREAM_FAIL_ON_SUSPEND|
1202                                               PA_STREAM_RELATIVE_VOLUME|
1203                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1204
1205
1206     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1207     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1208     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1209     /* Although some of the other flags are not supported on older
1210      * version, we don't check for them here, because it doesn't hurt
1211      * when they are passed but actually not supported. This makes
1212      * client development easier */
1213
1214     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1215     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1216     PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1217     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1218     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1219
1220     pa_stream_ref(s);
1221
1222     s->direction = direction;
1223
1224     if (sync_stream)
1225         s->syncid = sync_stream->syncid;
1226
1227     if (attr)
1228         s->buffer_attr = *attr;
1229     patch_buffer_attr(s, &s->buffer_attr, &flags);
1230
1231     s->flags = flags;
1232     s->corked = !!(flags & PA_STREAM_START_CORKED);
1233
1234     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1235         pa_usec_t x;
1236
1237         x = pa_rtclock_now();
1238
1239         pa_assert(!s->smoother);
1240         s->smoother = pa_smoother_new(
1241                 SMOOTHER_ADJUST_TIME,
1242                 SMOOTHER_HISTORY_TIME,
1243                 !(flags & PA_STREAM_NOT_MONOTONIC),
1244                 TRUE,
1245                 SMOOTHER_MIN_HISTORY,
1246                 x,
1247                 TRUE);
1248     }
1249
1250     if (!dev)
1251         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1252
1253     t = pa_tagstruct_command(
1254             s->context,
1255             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1256             &tag);
1257
1258     if (s->context->version < 13)
1259         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1260
1261     pa_tagstruct_put(
1262             t,
1263             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1264             PA_TAG_CHANNEL_MAP, &s->channel_map,
1265             PA_TAG_U32, PA_INVALID_INDEX,
1266             PA_TAG_STRING, dev,
1267             PA_TAG_U32, s->buffer_attr.maxlength,
1268             PA_TAG_BOOLEAN, s->corked,
1269             PA_TAG_INVALID);
1270
1271     if (!volume) {
1272         if (pa_sample_spec_valid(&s->sample_spec))
1273             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1274         else {
1275             /* This is not really relevant, since no volume was set, and
1276              * the real number of channels is embedded in the format_info
1277              * structure */
1278             volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1279         }
1280     }
1281
1282     if (s->direction == PA_STREAM_PLAYBACK) {
1283         pa_tagstruct_put(
1284                 t,
1285                 PA_TAG_U32, s->buffer_attr.tlength,
1286                 PA_TAG_U32, s->buffer_attr.prebuf,
1287                 PA_TAG_U32, s->buffer_attr.minreq,
1288                 PA_TAG_U32, s->syncid,
1289                 PA_TAG_INVALID);
1290
1291         pa_tagstruct_put_cvolume(t, volume);
1292     } else
1293         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1294
1295     if (s->context->version >= 12) {
1296         pa_tagstruct_put(
1297                 t,
1298                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1299                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1300                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1301                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1302                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1303                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1304                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1305                 PA_TAG_INVALID);
1306     }
1307
1308     if (s->context->version >= 13) {
1309
1310         if (s->direction == PA_STREAM_PLAYBACK)
1311             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1312         else
1313             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1314
1315         pa_tagstruct_put(
1316                 t,
1317                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1318                 PA_TAG_PROPLIST, s->proplist,
1319                 PA_TAG_INVALID);
1320
1321         if (s->direction == PA_STREAM_RECORD)
1322             pa_tagstruct_putu32(t, s->direct_on_input);
1323     }
1324
1325     if (s->context->version >= 14) {
1326
1327         if (s->direction == PA_STREAM_PLAYBACK)
1328             pa_tagstruct_put_boolean(t, volume_set);
1329
1330         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1331     }
1332
1333     if (s->context->version >= 15) {
1334
1335         if (s->direction == PA_STREAM_PLAYBACK)
1336             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1337
1338         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1339         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1340     }
1341
1342     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1343         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1344
1345     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1346         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1347
1348     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1349         || s->context->version >= 22) {
1350
1351         pa_tagstruct_putu8(t, s->n_formats);
1352         for (i = 0; i < s->n_formats; i++)
1353             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1354     }
1355
1356     if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1357         pa_tagstruct_put_cvolume(t, volume);
1358         pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1359         pa_tagstruct_put_boolean(t, volume_set);
1360         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1361         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1362         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1363     }
1364
1365     pa_pstream_send_tagstruct(s->context->pstream, t);
1366     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1367
1368     pa_stream_set_state(s, PA_STREAM_CREATING);
1369
1370     pa_stream_unref(s);
1371     return 0;
1372 }
1373
1374 int pa_stream_connect_playback(
1375         pa_stream *s,
1376         const char *dev,
1377         const pa_buffer_attr *attr,
1378         pa_stream_flags_t flags,
1379         const pa_cvolume *volume,
1380         pa_stream *sync_stream) {
1381
1382     pa_assert(s);
1383     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1384
1385     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1386 }
1387
1388 int pa_stream_connect_record(
1389         pa_stream *s,
1390         const char *dev,
1391         const pa_buffer_attr *attr,
1392         pa_stream_flags_t flags) {
1393
1394     pa_assert(s);
1395     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1396
1397     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1398 }
1399
1400 int pa_stream_begin_write(
1401         pa_stream *s,
1402         void **data,
1403         size_t *nbytes) {
1404
1405     pa_assert(s);
1406     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1407
1408     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1409     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1410     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1411     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1412     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1413
1414     if (*nbytes != (size_t) -1) {
1415         size_t m, fs;
1416
1417         m = pa_mempool_block_size_max(s->context->mempool);
1418         fs = pa_frame_size(&s->sample_spec);
1419
1420         m = (m / fs) * fs;
1421         if (*nbytes > m)
1422             *nbytes = m;
1423     }
1424
1425     if (!s->write_memblock) {
1426         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1427         s->write_data = pa_memblock_acquire(s->write_memblock);
1428     }
1429
1430     *data = s->write_data;
1431     *nbytes = pa_memblock_get_length(s->write_memblock);
1432
1433     return 0;
1434 }
1435
1436 int pa_stream_cancel_write(
1437         pa_stream *s) {
1438
1439     pa_assert(s);
1440     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1441
1442     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1443     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1444     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1445     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1446
1447     pa_assert(s->write_data);
1448
1449     pa_memblock_release(s->write_memblock);
1450     pa_memblock_unref(s->write_memblock);
1451     s->write_memblock = NULL;
1452     s->write_data = NULL;
1453
1454     return 0;
1455 }
1456
1457 int pa_stream_write(
1458         pa_stream *s,
1459         const void *data,
1460         size_t length,
1461         pa_free_cb_t free_cb,
1462         int64_t offset,
1463         pa_seek_mode_t seek) {
1464
1465     pa_assert(s);
1466     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1467     pa_assert(data);
1468
1469     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1470     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1471     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1472     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1473     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1474     PA_CHECK_VALIDITY(s->context,
1475                       !s->write_memblock ||
1476                       ((data >= s->write_data) &&
1477                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1478                       PA_ERR_INVALID);
1479     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1480
1481     if (s->write_memblock) {
1482         pa_memchunk chunk;
1483
1484         /* pa_stream_write_begin() was called before */
1485
1486         pa_memblock_release(s->write_memblock);
1487
1488         chunk.memblock = s->write_memblock;
1489         chunk.index = (const char *) data - (const char *) s->write_data;
1490         chunk.length = length;
1491
1492         s->write_memblock = NULL;
1493         s->write_data = NULL;
1494
1495         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1496         pa_memblock_unref(chunk.memblock);
1497
1498     } else {
1499         pa_seek_mode_t t_seek = seek;
1500         int64_t t_offset = offset;
1501         size_t t_length = length;
1502         const void *t_data = data;
1503
1504         /* pa_stream_write_begin() was not called before */
1505
1506         while (t_length > 0) {
1507             pa_memchunk chunk;
1508
1509             chunk.index = 0;
1510
1511             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1512                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1513                 chunk.length = t_length;
1514             } else {
1515                 void *d;
1516
1517                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1518                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1519
1520                 d = pa_memblock_acquire(chunk.memblock);
1521                 memcpy(d, t_data, chunk.length);
1522                 pa_memblock_release(chunk.memblock);
1523             }
1524
1525             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1526
1527             t_offset = 0;
1528             t_seek = PA_SEEK_RELATIVE;
1529
1530             t_data = (const uint8_t*) t_data + chunk.length;
1531             t_length -= chunk.length;
1532
1533             pa_memblock_unref(chunk.memblock);
1534         }
1535
1536         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1537             free_cb((void*) data);
1538     }
1539
1540     /* This is obviously wrong since we ignore the seeking index . But
1541      * that's OK, the server side applies the same error */
1542     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1543
1544     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1545
1546     if (s->direction == PA_STREAM_PLAYBACK) {
1547
1548         /* Update latency request correction */
1549         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1550
1551             if (seek == PA_SEEK_ABSOLUTE) {
1552                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1553                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1554                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1555             } else if (seek == PA_SEEK_RELATIVE) {
1556                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1557                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1558             } else
1559                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1560         }
1561
1562         /* Update the write index in the already available latency data */
1563         if (s->timing_info_valid) {
1564
1565             if (seek == PA_SEEK_ABSOLUTE) {
1566                 s->timing_info.write_index_corrupt = FALSE;
1567                 s->timing_info.write_index = offset + (int64_t) length;
1568             } else if (seek == PA_SEEK_RELATIVE) {
1569                 if (!s->timing_info.write_index_corrupt)
1570                     s->timing_info.write_index += offset + (int64_t) length;
1571             } else
1572                 s->timing_info.write_index_corrupt = TRUE;
1573         }
1574
1575         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1576             request_auto_timing_update(s, TRUE);
1577     }
1578
1579     return 0;
1580 }
1581
1582 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1583     pa_assert(s);
1584     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1585     pa_assert(data);
1586     pa_assert(length);
1587
1588     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1589     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1590     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1591
1592     if (!s->peek_memchunk.memblock) {
1593
1594         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1595             *data = NULL;
1596             *length = 0;
1597             return 0;
1598         }
1599
1600         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1601     }
1602
1603     pa_assert(s->peek_data);
1604     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1605     *length = s->peek_memchunk.length;
1606     return 0;
1607 }
1608
1609 int pa_stream_drop(pa_stream *s) {
1610     pa_assert(s);
1611     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1612
1613     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1614     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1615     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1616     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1617
1618     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1619
1620     /* Fix the simulated local read index */
1621     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1622         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1623
1624     pa_assert(s->peek_data);
1625     pa_memblock_release(s->peek_memchunk.memblock);
1626     pa_memblock_unref(s->peek_memchunk.memblock);
1627     pa_memchunk_reset(&s->peek_memchunk);
1628
1629     return 0;
1630 }
1631
1632 size_t pa_stream_writable_size(pa_stream *s) {
1633     pa_assert(s);
1634     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1635
1636     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1637     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1638     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1639
1640     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1641 }
1642
1643 size_t pa_stream_readable_size(pa_stream *s) {
1644     pa_assert(s);
1645     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1646
1647     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1648     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1649     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1650
1651     return pa_memblockq_get_length(s->record_memblockq);
1652 }
1653
1654 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1655     pa_operation *o;
1656     pa_tagstruct *t;
1657     uint32_t tag;
1658
1659     pa_assert(s);
1660     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1661
1662     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1663     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1664     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1665
1666     /* Ask for a timing update before we cork/uncork to get the best
1667      * accuracy for the transport latency suitable for the
1668      * check_smoother_status() call in the started callback */
1669     request_auto_timing_update(s, TRUE);
1670
1671     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1672
1673     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1674     pa_tagstruct_putu32(t, s->channel);
1675     pa_pstream_send_tagstruct(s->context->pstream, t);
1676     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1677
1678     /* This might cause the read index to continue again, hence
1679      * let's request a timing update */
1680     request_auto_timing_update(s, TRUE);
1681
1682     return o;
1683 }
1684
1685 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1686     pa_usec_t usec;
1687
1688     pa_assert(s);
1689     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1690     pa_assert(s->state == PA_STREAM_READY);
1691     pa_assert(s->direction != PA_STREAM_UPLOAD);
1692     pa_assert(s->timing_info_valid);
1693     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1694     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1695
1696     if (s->direction == PA_STREAM_PLAYBACK) {
1697         /* The last byte that was written into the output device
1698          * had this time value associated */
1699         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1700
1701         if (!s->corked && !s->suspended) {
1702
1703             if (!ignore_transport)
1704                 /* Because the latency info took a little time to come
1705                  * to us, we assume that the real output time is actually
1706                  * a little ahead */
1707                 usec += s->timing_info.transport_usec;
1708
1709             /* However, the output device usually maintains a buffer
1710                too, hence the real sample currently played is a little
1711                back  */
1712             if (s->timing_info.sink_usec >= usec)
1713                 usec = 0;
1714             else
1715                 usec -= s->timing_info.sink_usec;
1716         }
1717
1718     } else {
1719         pa_assert(s->direction == PA_STREAM_RECORD);
1720
1721         /* The last byte written into the server side queue had
1722          * this time value associated */
1723         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1724
1725         if (!s->corked && !s->suspended) {
1726
1727             if (!ignore_transport)
1728                 /* Add transport latency */
1729                 usec += s->timing_info.transport_usec;
1730
1731             /* Add latency of data in device buffer */
1732             usec += s->timing_info.source_usec;
1733
1734             /* If this is a monitor source, we need to correct the
1735              * time by the playback device buffer */
1736             if (s->timing_info.sink_usec >= usec)
1737                 usec = 0;
1738             else
1739                 usec -= s->timing_info.sink_usec;
1740         }
1741     }
1742
1743     return usec;
1744 }
1745
1746 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1747     pa_operation *o = userdata;
1748     struct timeval local, remote, now;
1749     pa_timing_info *i;
1750     pa_bool_t playing = FALSE;
1751     uint64_t underrun_for = 0, playing_for = 0;
1752
1753     pa_assert(pd);
1754     pa_assert(o);
1755     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1756
1757     if (!o->context || !o->stream)
1758         goto finish;
1759
1760     i = &o->stream->timing_info;
1761
1762     o->stream->timing_info_valid = FALSE;
1763     i->write_index_corrupt = TRUE;
1764     i->read_index_corrupt = TRUE;
1765
1766     if (command != PA_COMMAND_REPLY) {
1767         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1768             goto finish;
1769
1770     } else {
1771
1772         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1773             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1774             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1775             pa_tagstruct_get_timeval(t, &local) < 0 ||
1776             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1777             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1778             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1779
1780             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1781             goto finish;
1782         }
1783
1784         if (o->context->version >= 13 &&
1785             o->stream->direction == PA_STREAM_PLAYBACK)
1786             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1787                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1788
1789                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1790                 goto finish;
1791             }
1792
1793
1794         if (!pa_tagstruct_eof(t)) {
1795             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1796             goto finish;
1797         }
1798         o->stream->timing_info_valid = TRUE;
1799         i->write_index_corrupt = FALSE;
1800         i->read_index_corrupt = FALSE;
1801
1802         i->playing = (int) playing;
1803         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1804
1805         pa_gettimeofday(&now);
1806
1807         /* Calculate timestamps */
1808         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1809             /* local and remote seem to have synchronized clocks */
1810
1811             if (o->stream->direction == PA_STREAM_PLAYBACK)
1812                 i->transport_usec = pa_timeval_diff(&remote, &local);
1813             else
1814                 i->transport_usec = pa_timeval_diff(&now, &remote);
1815
1816             i->synchronized_clocks = TRUE;
1817             i->timestamp = remote;
1818         } else {
1819             /* clocks are not synchronized, let's estimate latency then */
1820             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1821             i->synchronized_clocks = FALSE;
1822             i->timestamp = local;
1823             pa_timeval_add(&i->timestamp, i->transport_usec);
1824         }
1825
1826         /* Invalidate read and write indexes if necessary */
1827         if (tag < o->stream->read_index_not_before)
1828             i->read_index_corrupt = TRUE;
1829
1830         if (tag < o->stream->write_index_not_before)
1831             i->write_index_corrupt = TRUE;
1832
1833         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1834             /* Write index correction */
1835
1836             int n, j;
1837             uint32_t ctag = tag;
1838
1839             /* Go through the saved correction values and add up the
1840              * total correction.*/
1841             for (n = 0, j = o->stream->current_write_index_correction+1;
1842                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1843                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1844
1845                 /* Step over invalid data or out-of-date data */
1846                 if (!o->stream->write_index_corrections[j].valid ||
1847                     o->stream->write_index_corrections[j].tag < ctag)
1848                     continue;
1849
1850                 /* Make sure that everything is in order */
1851                 ctag = o->stream->write_index_corrections[j].tag+1;
1852
1853                 /* Now fix the write index */
1854                 if (o->stream->write_index_corrections[j].corrupt) {
1855                     /* A corrupting seek was made */
1856                     i->write_index_corrupt = TRUE;
1857                 } else if (o->stream->write_index_corrections[j].absolute) {
1858                     /* An absolute seek was made */
1859                     i->write_index = o->stream->write_index_corrections[j].value;
1860                     i->write_index_corrupt = FALSE;
1861                 } else if (!i->write_index_corrupt) {
1862                     /* A relative seek was made */
1863                     i->write_index += o->stream->write_index_corrections[j].value;
1864                 }
1865             }
1866
1867             /* Clear old correction entries */
1868             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1869                 if (!o->stream->write_index_corrections[n].valid)
1870                     continue;
1871
1872                 if (o->stream->write_index_corrections[n].tag <= tag)
1873                     o->stream->write_index_corrections[n].valid = FALSE;
1874             }
1875         }
1876
1877         if (o->stream->direction == PA_STREAM_RECORD) {
1878             /* Read index correction */
1879
1880             if (!i->read_index_corrupt)
1881                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1882         }
1883
1884         /* Update smoother if we're not corked */
1885         if (o->stream->smoother && !o->stream->corked) {
1886             pa_usec_t u, x;
1887
1888             u = x = pa_rtclock_now() - i->transport_usec;
1889
1890             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1891                 pa_usec_t su;
1892
1893                 /* If we weren't playing then it will take some time
1894                  * until the audio will actually come out through the
1895                  * speakers. Since we follow that timing here, we need
1896                  * to try to fix this up */
1897
1898                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1899
1900                 if (su < i->sink_usec)
1901                     x += i->sink_usec - su;
1902             }
1903
1904             if (!i->playing)
1905                 pa_smoother_pause(o->stream->smoother, x);
1906
1907             /* Update the smoother */
1908             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1909                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1910                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1911
1912             if (i->playing)
1913                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1914         }
1915     }
1916
1917     o->stream->auto_timing_update_requested = FALSE;
1918
1919     if (o->stream->latency_update_callback)
1920         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1921
1922     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1923         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1924         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1925     }
1926
1927 finish:
1928
1929     pa_operation_done(o);
1930     pa_operation_unref(o);
1931 }
1932
1933 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1934     uint32_t tag;
1935     pa_operation *o;
1936     pa_tagstruct *t;
1937     struct timeval now;
1938     int cidx = 0;
1939
1940     pa_assert(s);
1941     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1942
1943     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1944     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1945     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1946
1947     if (s->direction == PA_STREAM_PLAYBACK) {
1948         /* Find a place to store the write_index correction data for this entry */
1949         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1950
1951         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1952         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1953     }
1954     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1955
1956     t = pa_tagstruct_command(
1957             s->context,
1958             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1959             &tag);
1960     pa_tagstruct_putu32(t, s->channel);
1961     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1962
1963     pa_pstream_send_tagstruct(s->context->pstream, t);
1964     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1965
1966     if (s->direction == PA_STREAM_PLAYBACK) {
1967         /* Fill in initial correction data */
1968
1969         s->current_write_index_correction = cidx;
1970
1971         s->write_index_corrections[cidx].valid = TRUE;
1972         s->write_index_corrections[cidx].absolute = FALSE;
1973         s->write_index_corrections[cidx].corrupt = FALSE;
1974         s->write_index_corrections[cidx].tag = tag;
1975         s->write_index_corrections[cidx].value = 0;
1976     }
1977
1978     return o;
1979 }
1980
1981 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1982     pa_stream *s = userdata;
1983
1984     pa_assert(pd);
1985     pa_assert(s);
1986     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1987
1988     pa_stream_ref(s);
1989
1990     if (command != PA_COMMAND_REPLY) {
1991         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1992             goto finish;
1993
1994         pa_stream_set_state(s, PA_STREAM_FAILED);
1995         goto finish;
1996     } else if (!pa_tagstruct_eof(t)) {
1997         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1998         goto finish;
1999     }
2000
2001     pa_stream_set_state(s, PA_STREAM_TERMINATED);
2002
2003 finish:
2004     pa_stream_unref(s);
2005 }
2006
2007 int pa_stream_disconnect(pa_stream *s) {
2008     pa_tagstruct *t;
2009     uint32_t tag;
2010
2011     pa_assert(s);
2012     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2013
2014     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2015     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2016     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2017
2018     pa_stream_ref(s);
2019
2020     t = pa_tagstruct_command(
2021             s->context,
2022             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2023                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2024             &tag);
2025     pa_tagstruct_putu32(t, s->channel);
2026     pa_pstream_send_tagstruct(s->context->pstream, t);
2027     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2028
2029     pa_stream_unref(s);
2030     return 0;
2031 }
2032
2033 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2034     pa_assert(s);
2035     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2036
2037     if (pa_detect_fork())
2038         return;
2039
2040     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2041         return;
2042
2043     s->read_callback = cb;
2044     s->read_userdata = userdata;
2045 }
2046
2047 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2048     pa_assert(s);
2049     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2050
2051     if (pa_detect_fork())
2052         return;
2053
2054     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2055         return;
2056
2057     s->write_callback = cb;
2058     s->write_userdata = userdata;
2059 }
2060
2061 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2062     pa_assert(s);
2063     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2064
2065     if (pa_detect_fork())
2066         return;
2067
2068     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2069         return;
2070
2071     s->state_callback = cb;
2072     s->state_userdata = userdata;
2073 }
2074
2075 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2076     pa_assert(s);
2077     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2078
2079     if (pa_detect_fork())
2080         return;
2081
2082     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2083         return;
2084
2085     s->overflow_callback = cb;
2086     s->overflow_userdata = userdata;
2087 }
2088
2089 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2090     pa_assert(s);
2091     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2092
2093     if (pa_detect_fork())
2094         return;
2095
2096     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2097         return;
2098
2099     s->underflow_callback = cb;
2100     s->underflow_userdata = userdata;
2101 }
2102
2103 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2104     pa_assert(s);
2105     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2106
2107     if (pa_detect_fork())
2108         return;
2109
2110     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2111         return;
2112
2113     s->latency_update_callback = cb;
2114     s->latency_update_userdata = userdata;
2115 }
2116
2117 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2118     pa_assert(s);
2119     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2120
2121     if (pa_detect_fork())
2122         return;
2123
2124     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2125         return;
2126
2127     s->moved_callback = cb;
2128     s->moved_userdata = userdata;
2129 }
2130
2131 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2132     pa_assert(s);
2133     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2134
2135     if (pa_detect_fork())
2136         return;
2137
2138     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2139         return;
2140
2141     s->suspended_callback = cb;
2142     s->suspended_userdata = userdata;
2143 }
2144
2145 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2146     pa_assert(s);
2147     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2148
2149     if (pa_detect_fork())
2150         return;
2151
2152     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2153         return;
2154
2155     s->started_callback = cb;
2156     s->started_userdata = userdata;
2157 }
2158
2159 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2160     pa_assert(s);
2161     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2162
2163     if (pa_detect_fork())
2164         return;
2165
2166     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2167         return;
2168
2169     s->event_callback = cb;
2170     s->event_userdata = userdata;
2171 }
2172
2173 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2174     pa_assert(s);
2175     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2176
2177     if (pa_detect_fork())
2178         return;
2179
2180     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2181         return;
2182
2183     s->buffer_attr_callback = cb;
2184     s->buffer_attr_userdata = userdata;
2185 }
2186
2187 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2188     pa_operation *o = userdata;
2189     int success = 1;
2190
2191     pa_assert(pd);
2192     pa_assert(o);
2193     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2194
2195     if (!o->context)
2196         goto finish;
2197
2198     if (command != PA_COMMAND_REPLY) {
2199         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2200             goto finish;
2201
2202         success = 0;
2203     } else if (!pa_tagstruct_eof(t)) {
2204         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2205         goto finish;
2206     }
2207
2208     if (o->callback) {
2209         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2210         cb(o->stream, success, o->userdata);
2211     }
2212
2213 finish:
2214     pa_operation_done(o);
2215     pa_operation_unref(o);
2216 }
2217
2218 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2219     pa_operation *o;
2220     pa_tagstruct *t;
2221     uint32_t tag;
2222
2223     pa_assert(s);
2224     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2225
2226     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2227     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2228     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2229
2230     /* Ask for a timing update before we cork/uncork to get the best
2231      * accuracy for the transport latency suitable for the
2232      * check_smoother_status() call in the started callback */
2233     request_auto_timing_update(s, TRUE);
2234
2235     s->corked = b;
2236
2237     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2238
2239     t = pa_tagstruct_command(
2240             s->context,
2241             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2242             &tag);
2243     pa_tagstruct_putu32(t, s->channel);
2244     pa_tagstruct_put_boolean(t, !!b);
2245     pa_pstream_send_tagstruct(s->context->pstream, t);
2246     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2247
2248     check_smoother_status(s, FALSE, FALSE, FALSE);
2249
2250     /* This might cause the indexes to hang/start again, hence let's
2251      * request a timing update, after the cork/uncork, too */
2252     request_auto_timing_update(s, TRUE);
2253
2254     return o;
2255 }
2256
2257 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2258     pa_tagstruct *t;
2259     pa_operation *o;
2260     uint32_t tag;
2261
2262     pa_assert(s);
2263     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2264
2265     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2266     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2267
2268     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2269
2270     t = pa_tagstruct_command(s->context, command, &tag);
2271     pa_tagstruct_putu32(t, s->channel);
2272     pa_pstream_send_tagstruct(s->context->pstream, t);
2273     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2274
2275     return o;
2276 }
2277
2278 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2279     pa_operation *o;
2280
2281     pa_assert(s);
2282     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2283
2284     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2285     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2286     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2287
2288     /* Ask for a timing update *before* the flush, so that the
2289      * transport usec is as up to date as possible when we get the
2290      * underflow message and update the smoother status*/
2291     request_auto_timing_update(s, TRUE);
2292
2293     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2294         return NULL;
2295
2296     if (s->direction == PA_STREAM_PLAYBACK) {
2297
2298         if (s->write_index_corrections[s->current_write_index_correction].valid)
2299             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2300
2301         if (s->buffer_attr.prebuf > 0)
2302             check_smoother_status(s, FALSE, FALSE, TRUE);
2303
2304         /* This will change the write index, but leave the
2305          * read index untouched. */
2306         invalidate_indexes(s, FALSE, TRUE);
2307
2308     } else
2309         /* For record streams this has no influence on the write
2310          * index, but the read index might jump. */
2311         invalidate_indexes(s, TRUE, FALSE);
2312
2313     /* Note that we do not update requested_bytes here. This is
2314      * because we cannot really know how data actually was dropped
2315      * from the write index due to this. This 'error' will be applied
2316      * by both client and server and hence we should be fine. */
2317
2318     return o;
2319 }
2320
2321 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2322     pa_operation *o;
2323
2324     pa_assert(s);
2325     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2326
2327     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2328     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2329     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2330     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2331
2332     /* Ask for a timing update before we cork/uncork to get the best
2333      * accuracy for the transport latency suitable for the
2334      * check_smoother_status() call in the started callback */
2335     request_auto_timing_update(s, TRUE);
2336
2337     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2338         return NULL;
2339
2340     /* This might cause the read index to hang again, hence
2341      * let's request a timing update */
2342     request_auto_timing_update(s, TRUE);
2343
2344     return o;
2345 }
2346
2347 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2348     pa_operation *o;
2349
2350     pa_assert(s);
2351     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2352
2353     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2354     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2355     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2356     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2357
2358     /* Ask for a timing update before we cork/uncork to get the best
2359      * accuracy for the transport latency suitable for the
2360      * check_smoother_status() call in the started callback */
2361     request_auto_timing_update(s, TRUE);
2362
2363     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2364         return NULL;
2365
2366     /* This might cause the read index to start moving again, hence
2367      * let's request a timing update */
2368     request_auto_timing_update(s, TRUE);
2369
2370     return o;
2371 }
2372
2373 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2374     pa_operation *o;
2375
2376     pa_assert(s);
2377     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2378     pa_assert(name);
2379
2380     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2381     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2382     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2383
2384     if (s->context->version >= 13) {
2385         pa_proplist *p = pa_proplist_new();
2386
2387         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2388         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2389         pa_proplist_free(p);
2390     } else {
2391         pa_tagstruct *t;
2392         uint32_t tag;
2393
2394         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2395         t = pa_tagstruct_command(
2396                 s->context,
2397                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2398                 &tag);
2399         pa_tagstruct_putu32(t, s->channel);
2400         pa_tagstruct_puts(t, name);
2401         pa_pstream_send_tagstruct(s->context->pstream, t);
2402         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2403     }
2404
2405     return o;
2406 }
2407
2408 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2409     pa_usec_t usec;
2410
2411     pa_assert(s);
2412     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2413
2414     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2415     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2416     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2417     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2418     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2419     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2420
2421     if (s->smoother)
2422         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2423     else
2424         usec = calc_time(s, FALSE);
2425
2426     /* Make sure the time runs monotonically */
2427     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2428         if (usec < s->previous_time)
2429             usec = s->previous_time;
2430         else
2431             s->previous_time = usec;
2432     }
2433
2434     if (r_usec)
2435         *r_usec = usec;
2436
2437     return 0;
2438 }
2439
2440 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2441     pa_assert(s);
2442     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2443
2444     if (negative)
2445         *negative = 0;
2446
2447     if (a >= b)
2448         return a-b;
2449     else {
2450         if (negative && s->direction == PA_STREAM_RECORD) {
2451             *negative = 1;
2452             return b-a;
2453         } else
2454             return 0;
2455     }
2456 }
2457
2458 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2459     pa_usec_t t, c;
2460     int r;
2461     int64_t cindex;
2462
2463     pa_assert(s);
2464     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2465     pa_assert(r_usec);
2466
2467     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2468     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2469     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2470     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2471     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2472     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2473
2474     if ((r = pa_stream_get_time(s, &t)) < 0)
2475         return r;
2476
2477     if (s->direction == PA_STREAM_PLAYBACK)
2478         cindex = s->timing_info.write_index;
2479     else
2480         cindex = s->timing_info.read_index;
2481
2482     if (cindex < 0)
2483         cindex = 0;
2484
2485     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2486
2487     if (s->direction == PA_STREAM_PLAYBACK)
2488         *r_usec = time_counter_diff(s, c, t, negative);
2489     else
2490         *r_usec = time_counter_diff(s, t, c, negative);
2491
2492     return 0;
2493 }
2494
2495 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2496     pa_assert(s);
2497     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2498
2499     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2500     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2501     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2502     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2503
2504     return &s->timing_info;
2505 }
2506
2507 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2508     pa_assert(s);
2509     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2510
2511     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2512
2513     return &s->sample_spec;
2514 }
2515
2516 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2517     pa_assert(s);
2518     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2519
2520     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2521
2522     return &s->channel_map;
2523 }
2524
2525 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2526     pa_assert(s);
2527     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2528
2529     /* We don't have the format till routing is done */
2530     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2531     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2532
2533     return s->format;
2534 }
2535 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2536     pa_assert(s);
2537     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2538
2539     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2540     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2541     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2542
2543     return &s->buffer_attr;
2544 }
2545
2546 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2547     pa_operation *o = userdata;
2548     int success = 1;
2549
2550     pa_assert(pd);
2551     pa_assert(o);
2552     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2553
2554     if (!o->context)
2555         goto finish;
2556
2557     if (command != PA_COMMAND_REPLY) {
2558         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2559             goto finish;
2560
2561         success = 0;
2562     } else {
2563         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2564             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2565                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2566                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2567                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2568                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2569                 goto finish;
2570             }
2571         } else if (o->stream->direction == PA_STREAM_RECORD) {
2572             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2573                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2574                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2575                 goto finish;
2576             }
2577         }
2578
2579         if (o->stream->context->version >= 13) {
2580             pa_usec_t usec;
2581
2582             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2583                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2584                 goto finish;
2585             }
2586
2587             if (o->stream->direction == PA_STREAM_RECORD)
2588                 o->stream->timing_info.configured_source_usec = usec;
2589             else
2590                 o->stream->timing_info.configured_sink_usec = usec;
2591         }
2592
2593         if (!pa_tagstruct_eof(t)) {
2594             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2595             goto finish;
2596         }
2597     }
2598
2599     if (o->callback) {
2600         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2601         cb(o->stream, success, o->userdata);
2602     }
2603
2604 finish:
2605     pa_operation_done(o);
2606     pa_operation_unref(o);
2607 }
2608
2609
2610 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2611     pa_operation *o;
2612     pa_tagstruct *t;
2613     uint32_t tag;
2614     pa_buffer_attr copy;
2615
2616     pa_assert(s);
2617     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2618     pa_assert(attr);
2619
2620     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2621     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2622     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2623     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2624
2625     /* Ask for a timing update before we cork/uncork to get the best
2626      * accuracy for the transport latency suitable for the
2627      * check_smoother_status() call in the started callback */
2628     request_auto_timing_update(s, TRUE);
2629
2630     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2631
2632     t = pa_tagstruct_command(
2633             s->context,
2634             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2635             &tag);
2636     pa_tagstruct_putu32(t, s->channel);
2637
2638     copy = *attr;
2639     patch_buffer_attr(s, &copy, NULL);
2640     attr = &copy;
2641
2642     pa_tagstruct_putu32(t, attr->maxlength);
2643
2644     if (s->direction == PA_STREAM_PLAYBACK)
2645         pa_tagstruct_put(
2646                 t,
2647                 PA_TAG_U32, attr->tlength,
2648                 PA_TAG_U32, attr->prebuf,
2649                 PA_TAG_U32, attr->minreq,
2650                 PA_TAG_INVALID);
2651     else
2652         pa_tagstruct_putu32(t, attr->fragsize);
2653
2654     if (s->context->version >= 13)
2655         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2656
2657     if (s->context->version >= 14)
2658         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2659
2660     pa_pstream_send_tagstruct(s->context->pstream, t);
2661     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2662
2663     /* This might cause changes in the read/write index, hence let's
2664      * request a timing update */
2665     request_auto_timing_update(s, TRUE);
2666
2667     return o;
2668 }
2669
2670 uint32_t pa_stream_get_device_index(pa_stream *s) {
2671     pa_assert(s);
2672     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2673
2674     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2675     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2676     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2677     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2678     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2679
2680     return s->device_index;
2681 }
2682
2683 const char *pa_stream_get_device_name(pa_stream *s) {
2684     pa_assert(s);
2685     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2686
2687     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2688     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2689     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2690     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2691     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2692
2693     return s->device_name;
2694 }
2695
2696 int pa_stream_is_suspended(pa_stream *s) {
2697     pa_assert(s);
2698     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2699
2700     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2701     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2702     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2703     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2704
2705     return s->suspended;
2706 }
2707
2708 int pa_stream_is_corked(pa_stream *s) {
2709     pa_assert(s);
2710     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2711
2712     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2713     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2714     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2715
2716     return s->corked;
2717 }
2718
2719 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2720     pa_operation *o = userdata;
2721     int success = 1;
2722
2723     pa_assert(pd);
2724     pa_assert(o);
2725     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2726
2727     if (!o->context)
2728         goto finish;
2729
2730     if (command != PA_COMMAND_REPLY) {
2731         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2732             goto finish;
2733
2734         success = 0;
2735     } else {
2736
2737         if (!pa_tagstruct_eof(t)) {
2738             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2739             goto finish;
2740         }
2741     }
2742
2743     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2744     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2745
2746     if (o->callback) {
2747         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2748         cb(o->stream, success, o->userdata);
2749     }
2750
2751 finish:
2752     pa_operation_done(o);
2753     pa_operation_unref(o);
2754 }
2755
2756
2757 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2758     pa_operation *o;
2759     pa_tagstruct *t;
2760     uint32_t tag;
2761
2762     pa_assert(s);
2763     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2764
2765     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2766     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2767     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2768     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2769     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2770     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2771
2772     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2773     o->private = PA_UINT_TO_PTR(rate);
2774
2775     t = pa_tagstruct_command(
2776             s->context,
2777             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2778             &tag);
2779     pa_tagstruct_putu32(t, s->channel);
2780     pa_tagstruct_putu32(t, rate);
2781
2782     pa_pstream_send_tagstruct(s->context->pstream, t);
2783     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2784
2785     return o;
2786 }
2787
2788 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2789     pa_operation *o;
2790     pa_tagstruct *t;
2791     uint32_t tag;
2792
2793     pa_assert(s);
2794     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2795
2796     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2797     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2798     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2799     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2800     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2801
2802     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2803
2804     t = pa_tagstruct_command(
2805             s->context,
2806             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2807             &tag);
2808     pa_tagstruct_putu32(t, s->channel);
2809     pa_tagstruct_putu32(t, (uint32_t) mode);
2810     pa_tagstruct_put_proplist(t, p);
2811
2812     pa_pstream_send_tagstruct(s->context->pstream, t);
2813     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2814
2815     /* Please note that we don't update s->proplist here, because we
2816      * don't export that field */
2817
2818     return o;
2819 }
2820
2821 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2822     pa_operation *o;
2823     pa_tagstruct *t;
2824     uint32_t tag;
2825     const char * const*k;
2826
2827     pa_assert(s);
2828     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2829
2830     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2831     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2832     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2833     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2834     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2835
2836     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2837
2838     t = pa_tagstruct_command(
2839             s->context,
2840             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2841             &tag);
2842     pa_tagstruct_putu32(t, s->channel);
2843
2844     for (k = keys; *k; k++)
2845         pa_tagstruct_puts(t, *k);
2846
2847     pa_tagstruct_puts(t, NULL);
2848
2849     pa_pstream_send_tagstruct(s->context->pstream, t);
2850     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2851
2852     /* Please note that we don't update s->proplist here, because we
2853      * don't export that field */
2854
2855     return o;
2856 }
2857
2858 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2859     pa_assert(s);
2860     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2861
2862     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2863     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2864     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2865     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2866
2867     s->direct_on_input = sink_input_idx;
2868
2869     return 0;
2870 }
2871
2872 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2873     pa_assert(s);
2874     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2875
2876     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2877     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2878     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2879
2880     return s->direct_on_input;
2881 }