sink-input: Provide more information to client when format is lost
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_format_info * const *formats,
89         pa_proplist *p) {
90
91     pa_stream *s;
92     int i;
93
94     pa_assert(c);
95     pa_assert(PA_REFCNT_VALUE(c) >= 1);
96     pa_assert((ss == NULL && map == NULL) || formats == NULL);
97
98     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
100
101     s = pa_xnew(pa_stream, 1);
102     PA_REFCNT_INIT(s);
103     s->context = c;
104     s->mainloop = c->mainloop;
105
106     s->direction = PA_STREAM_NODIRECTION;
107     s->state = PA_STREAM_UNCONNECTED;
108     s->flags = 0;
109
110     if (ss)
111         s->sample_spec = *ss;
112     else
113         s->sample_spec.format = PA_SAMPLE_INVALID;
114
115     if (map)
116         s->channel_map = *map;
117     else
118         pa_channel_map_init(&s->channel_map);
119
120     s->n_formats = 0;
121     if (formats) {
122         for (i = 0; formats[i] && i < PA_MAX_FORMATS; i++) {
123             s->n_formats++;
124             s->req_formats[i] = pa_format_info_copy(formats[i]);
125         }
126         /* Make sure the input array was NULL-terminated */
127         pa_assert(formats[i] == NULL);
128     }
129
130     /* We'll get the final negotiated format after connecting */
131     s->format = NULL;
132
133     s->direct_on_input = PA_INVALID_INDEX;
134
135     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136     if (name)
137         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
138
139     s->channel = 0;
140     s->channel_valid = FALSE;
141     s->syncid = c->csyncid++;
142     s->stream_index = PA_INVALID_INDEX;
143
144     s->requested_bytes = 0;
145     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146
147     /* We initialize der target length here, so that if the user
148      * passes no explicit buffering metrics the default is similar to
149      * what older PA versions provided. */
150
151     s->buffer_attr.maxlength = (uint32_t) -1;
152     if (ss)
153         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154     else {
155         /* FIXME: We assume a worst-case compressed format corresponding to
156          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
157         pa_sample_spec tmp_ss = {
158             .format   = PA_SAMPLE_S16NE,
159             .rate     = 48000,
160             .channels = 2,
161         };
162         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163     }
164     s->buffer_attr.minreq = (uint32_t) -1;
165     s->buffer_attr.prebuf = (uint32_t) -1;
166     s->buffer_attr.fragsize = (uint32_t) -1;
167
168     s->device_index = PA_INVALID_INDEX;
169     s->device_name = NULL;
170     s->suspended = FALSE;
171     s->corked = FALSE;
172
173     s->write_memblock = NULL;
174     s->write_data = NULL;
175
176     pa_memchunk_reset(&s->peek_memchunk);
177     s->peek_data = NULL;
178     s->record_memblockq = NULL;
179
180     memset(&s->timing_info, 0, sizeof(s->timing_info));
181     s->timing_info_valid = FALSE;
182
183     s->previous_time = 0;
184
185     s->read_index_not_before = 0;
186     s->write_index_not_before = 0;
187     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188         s->write_index_corrections[i].valid = 0;
189     s->current_write_index_correction = 0;
190
191     s->auto_timing_update_event = NULL;
192     s->auto_timing_update_requested = FALSE;
193     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195     reset_callbacks(s);
196
197     s->smoother = NULL;
198
199     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200     PA_LLIST_PREPEND(pa_stream, c->streams, s);
201     pa_stream_ref(s);
202
203     return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207         pa_context *c,
208         const char *name,
209         const pa_sample_spec *ss,
210         const pa_channel_map *map,
211         pa_proplist *p) {
212
213     pa_channel_map tmap;
214
215     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221     if (!map)
222         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228         pa_context *c,
229         const char *name,
230         pa_format_info * const *formats,
231         pa_proplist *p) {
232
233     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
234
235     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, p);
236 }
237
238 static void stream_unlink(pa_stream *s) {
239     pa_operation *o, *n;
240     pa_assert(s);
241
242     if (!s->context)
243         return;
244
245     /* Detach from context */
246
247     /* Unref all operatio object that point to us */
248     for (o = s->context->operations; o; o = n) {
249         n = o->next;
250
251         if (o->stream == s)
252             pa_operation_cancel(o);
253     }
254
255     /* Drop all outstanding replies for this stream */
256     if (s->context->pdispatch)
257         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
258
259     if (s->channel_valid) {
260         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
261         s->channel = 0;
262         s->channel_valid = FALSE;
263     }
264
265     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
266     pa_stream_unref(s);
267
268     s->context = NULL;
269
270     if (s->auto_timing_update_event) {
271         pa_assert(s->mainloop);
272         s->mainloop->time_free(s->auto_timing_update_event);
273     }
274
275     reset_callbacks(s);
276 }
277
278 static void stream_free(pa_stream *s) {
279     unsigned int i;
280
281     pa_assert(s);
282
283     stream_unlink(s);
284
285     if (s->write_memblock) {
286         pa_memblock_release(s->write_memblock);
287         pa_memblock_unref(s->write_data);
288     }
289
290     if (s->peek_memchunk.memblock) {
291         if (s->peek_data)
292             pa_memblock_release(s->peek_memchunk.memblock);
293         pa_memblock_unref(s->peek_memchunk.memblock);
294     }
295
296     if (s->record_memblockq)
297         pa_memblockq_free(s->record_memblockq);
298
299     if (s->proplist)
300         pa_proplist_free(s->proplist);
301
302     if (s->smoother)
303         pa_smoother_free(s->smoother);
304
305     for (i = 0; i < s->n_formats; i++)
306         pa_xfree(s->req_formats[i]);
307
308     pa_xfree(s->device_name);
309     pa_xfree(s);
310 }
311
312 void pa_stream_unref(pa_stream *s) {
313     pa_assert(s);
314     pa_assert(PA_REFCNT_VALUE(s) >= 1);
315
316     if (PA_REFCNT_DEC(s) <= 0)
317         stream_free(s);
318 }
319
320 pa_stream* pa_stream_ref(pa_stream *s) {
321     pa_assert(s);
322     pa_assert(PA_REFCNT_VALUE(s) >= 1);
323
324     PA_REFCNT_INC(s);
325     return s;
326 }
327
328 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
329     pa_assert(s);
330     pa_assert(PA_REFCNT_VALUE(s) >= 1);
331
332     return s->state;
333 }
334
335 pa_context* pa_stream_get_context(pa_stream *s) {
336     pa_assert(s);
337     pa_assert(PA_REFCNT_VALUE(s) >= 1);
338
339     return s->context;
340 }
341
342 uint32_t pa_stream_get_index(pa_stream *s) {
343     pa_assert(s);
344     pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
347     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
348
349     return s->stream_index;
350 }
351
352 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
353     pa_assert(s);
354     pa_assert(PA_REFCNT_VALUE(s) >= 1);
355
356     if (s->state == st)
357         return;
358
359     pa_stream_ref(s);
360
361     s->state = st;
362
363     if (s->state_callback)
364         s->state_callback(s, s->state_userdata);
365
366     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
367         stream_unlink(s);
368
369     pa_stream_unref(s);
370 }
371
372 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
373     pa_assert(s);
374     pa_assert(PA_REFCNT_VALUE(s) >= 1);
375
376     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
377         return;
378
379     if (s->state == PA_STREAM_READY &&
380         (force || !s->auto_timing_update_requested)) {
381         pa_operation *o;
382
383 /*         pa_log("Automatically requesting new timing data"); */
384
385         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
386             pa_operation_unref(o);
387             s->auto_timing_update_requested = TRUE;
388         }
389     }
390
391     if (s->auto_timing_update_event) {
392         if (force)
393             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
394
395         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
396
397         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
398     }
399 }
400
401 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
402     pa_context *c = userdata;
403     pa_stream *s;
404     uint32_t channel;
405
406     pa_assert(pd);
407     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
408     pa_assert(t);
409     pa_assert(c);
410     pa_assert(PA_REFCNT_VALUE(c) >= 1);
411
412     pa_context_ref(c);
413
414     if (pa_tagstruct_getu32(t, &channel) < 0 ||
415         !pa_tagstruct_eof(t)) {
416         pa_context_fail(c, PA_ERR_PROTOCOL);
417         goto finish;
418     }
419
420     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
421         goto finish;
422
423     if (s->state != PA_STREAM_READY)
424         goto finish;
425
426     pa_context_set_error(c, PA_ERR_KILLED);
427     pa_stream_set_state(s, PA_STREAM_FAILED);
428
429 finish:
430     pa_context_unref(c);
431 }
432
433 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
434     pa_usec_t x;
435
436     pa_assert(s);
437     pa_assert(!force_start || !force_stop);
438
439     if (!s->smoother)
440         return;
441
442     x = pa_rtclock_now();
443
444     if (s->timing_info_valid) {
445         if (aposteriori)
446             x -= s->timing_info.transport_usec;
447         else
448             x += s->timing_info.transport_usec;
449     }
450
451     if (s->suspended || s->corked || force_stop)
452         pa_smoother_pause(s->smoother, x);
453     else if (force_start || s->buffer_attr.prebuf == 0) {
454
455         if (!s->timing_info_valid &&
456             !aposteriori &&
457             !force_start &&
458             !force_stop &&
459             s->context->version >= 13) {
460
461             /* If the server supports STARTED events we take them as
462              * indications when audio really starts/stops playing, if
463              * we don't have any timing info yet -- instead of trying
464              * to be smart and guessing the server time. Otherwise the
465              * unknown transport delay add too much noise to our time
466              * calculations. */
467
468             return;
469         }
470
471         pa_smoother_resume(s->smoother, x, TRUE);
472     }
473
474     /* Please note that we have no idea if playback actually started
475      * if prebuf is non-zero! */
476 }
477
478 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
479     pa_context *c = userdata;
480     pa_stream *s;
481     uint32_t channel;
482     const char *dn;
483     pa_bool_t suspended;
484     uint32_t di;
485     pa_usec_t usec = 0;
486     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
487
488     pa_assert(pd);
489     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
490     pa_assert(t);
491     pa_assert(c);
492     pa_assert(PA_REFCNT_VALUE(c) >= 1);
493
494     pa_context_ref(c);
495
496     if (c->version < 12) {
497         pa_context_fail(c, PA_ERR_PROTOCOL);
498         goto finish;
499     }
500
501     if (pa_tagstruct_getu32(t, &channel) < 0 ||
502         pa_tagstruct_getu32(t, &di) < 0 ||
503         pa_tagstruct_gets(t, &dn) < 0 ||
504         pa_tagstruct_get_boolean(t, &suspended) < 0) {
505         pa_context_fail(c, PA_ERR_PROTOCOL);
506         goto finish;
507     }
508
509     if (c->version >= 13) {
510
511         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
512             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
513                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
514                 pa_tagstruct_get_usec(t, &usec) < 0) {
515                 pa_context_fail(c, PA_ERR_PROTOCOL);
516                 goto finish;
517             }
518         } else {
519             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
520                 pa_tagstruct_getu32(t, &tlength) < 0 ||
521                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
522                 pa_tagstruct_getu32(t, &minreq) < 0 ||
523                 pa_tagstruct_get_usec(t, &usec) < 0) {
524                 pa_context_fail(c, PA_ERR_PROTOCOL);
525                 goto finish;
526             }
527         }
528     }
529
530     if (!pa_tagstruct_eof(t)) {
531         pa_context_fail(c, PA_ERR_PROTOCOL);
532         goto finish;
533     }
534
535     if (!dn || di == PA_INVALID_INDEX) {
536         pa_context_fail(c, PA_ERR_PROTOCOL);
537         goto finish;
538     }
539
540     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
541         goto finish;
542
543     if (s->state != PA_STREAM_READY)
544         goto finish;
545
546     if (c->version >= 13) {
547         if (s->direction == PA_STREAM_RECORD)
548             s->timing_info.configured_source_usec = usec;
549         else
550             s->timing_info.configured_sink_usec = usec;
551
552         s->buffer_attr.maxlength = maxlength;
553         s->buffer_attr.fragsize = fragsize;
554         s->buffer_attr.tlength = tlength;
555         s->buffer_attr.prebuf = prebuf;
556         s->buffer_attr.minreq = minreq;
557     }
558
559     pa_xfree(s->device_name);
560     s->device_name = pa_xstrdup(dn);
561     s->device_index = di;
562
563     s->suspended = suspended;
564
565     check_smoother_status(s, TRUE, FALSE, FALSE);
566     request_auto_timing_update(s, TRUE);
567
568     if (s->moved_callback)
569         s->moved_callback(s, s->moved_userdata);
570
571 finish:
572     pa_context_unref(c);
573 }
574
575 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
576     pa_context *c = userdata;
577     pa_stream *s;
578     uint32_t channel;
579     pa_usec_t usec = 0;
580     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
581
582     pa_assert(pd);
583     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
584     pa_assert(t);
585     pa_assert(c);
586     pa_assert(PA_REFCNT_VALUE(c) >= 1);
587
588     pa_context_ref(c);
589
590     if (c->version < 15) {
591         pa_context_fail(c, PA_ERR_PROTOCOL);
592         goto finish;
593     }
594
595     if (pa_tagstruct_getu32(t, &channel) < 0) {
596         pa_context_fail(c, PA_ERR_PROTOCOL);
597         goto finish;
598     }
599
600     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
601         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
602             pa_tagstruct_getu32(t, &fragsize) < 0 ||
603             pa_tagstruct_get_usec(t, &usec) < 0) {
604             pa_context_fail(c, PA_ERR_PROTOCOL);
605             goto finish;
606         }
607     } else {
608         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
609             pa_tagstruct_getu32(t, &tlength) < 0 ||
610             pa_tagstruct_getu32(t, &prebuf) < 0 ||
611             pa_tagstruct_getu32(t, &minreq) < 0 ||
612             pa_tagstruct_get_usec(t, &usec) < 0) {
613             pa_context_fail(c, PA_ERR_PROTOCOL);
614             goto finish;
615         }
616     }
617
618     if (!pa_tagstruct_eof(t)) {
619         pa_context_fail(c, PA_ERR_PROTOCOL);
620         goto finish;
621     }
622
623     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
624         goto finish;
625
626     if (s->state != PA_STREAM_READY)
627         goto finish;
628
629     if (s->direction == PA_STREAM_RECORD)
630         s->timing_info.configured_source_usec = usec;
631     else
632         s->timing_info.configured_sink_usec = usec;
633
634     s->buffer_attr.maxlength = maxlength;
635     s->buffer_attr.fragsize = fragsize;
636     s->buffer_attr.tlength = tlength;
637     s->buffer_attr.prebuf = prebuf;
638     s->buffer_attr.minreq = minreq;
639
640     request_auto_timing_update(s, TRUE);
641
642     if (s->buffer_attr_callback)
643         s->buffer_attr_callback(s, s->buffer_attr_userdata);
644
645 finish:
646     pa_context_unref(c);
647 }
648
649 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
650     pa_context *c = userdata;
651     pa_stream *s;
652     uint32_t channel;
653     pa_bool_t suspended;
654
655     pa_assert(pd);
656     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
657     pa_assert(t);
658     pa_assert(c);
659     pa_assert(PA_REFCNT_VALUE(c) >= 1);
660
661     pa_context_ref(c);
662
663     if (c->version < 12) {
664         pa_context_fail(c, PA_ERR_PROTOCOL);
665         goto finish;
666     }
667
668     if (pa_tagstruct_getu32(t, &channel) < 0 ||
669         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
670         !pa_tagstruct_eof(t)) {
671         pa_context_fail(c, PA_ERR_PROTOCOL);
672         goto finish;
673     }
674
675     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
676         goto finish;
677
678     if (s->state != PA_STREAM_READY)
679         goto finish;
680
681     s->suspended = suspended;
682
683     check_smoother_status(s, TRUE, FALSE, FALSE);
684     request_auto_timing_update(s, TRUE);
685
686     if (s->suspended_callback)
687         s->suspended_callback(s, s->suspended_userdata);
688
689 finish:
690     pa_context_unref(c);
691 }
692
693 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
694     pa_context *c = userdata;
695     pa_stream *s;
696     uint32_t channel;
697
698     pa_assert(pd);
699     pa_assert(command == PA_COMMAND_STARTED);
700     pa_assert(t);
701     pa_assert(c);
702     pa_assert(PA_REFCNT_VALUE(c) >= 1);
703
704     pa_context_ref(c);
705
706     if (c->version < 13) {
707         pa_context_fail(c, PA_ERR_PROTOCOL);
708         goto finish;
709     }
710
711     if (pa_tagstruct_getu32(t, &channel) < 0 ||
712         !pa_tagstruct_eof(t)) {
713         pa_context_fail(c, PA_ERR_PROTOCOL);
714         goto finish;
715     }
716
717     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
718         goto finish;
719
720     if (s->state != PA_STREAM_READY)
721         goto finish;
722
723     check_smoother_status(s, TRUE, TRUE, FALSE);
724     request_auto_timing_update(s, TRUE);
725
726     if (s->started_callback)
727         s->started_callback(s, s->started_userdata);
728
729 finish:
730     pa_context_unref(c);
731 }
732
733 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
734     pa_context *c = userdata;
735     pa_stream *s;
736     uint32_t channel;
737     pa_proplist *pl = NULL;
738     const char *event;
739
740     pa_assert(pd);
741     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
742     pa_assert(t);
743     pa_assert(c);
744     pa_assert(PA_REFCNT_VALUE(c) >= 1);
745
746     pa_context_ref(c);
747
748     if (c->version < 15) {
749         pa_context_fail(c, PA_ERR_PROTOCOL);
750         goto finish;
751     }
752
753     pl = pa_proplist_new();
754
755     if (pa_tagstruct_getu32(t, &channel) < 0 ||
756         pa_tagstruct_gets(t, &event) < 0 ||
757         pa_tagstruct_get_proplist(t, pl) < 0 ||
758         !pa_tagstruct_eof(t) || !event) {
759         pa_context_fail(c, PA_ERR_PROTOCOL);
760         goto finish;
761     }
762
763     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
764         goto finish;
765
766     if (s->state != PA_STREAM_READY)
767         goto finish;
768
769     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
770         /* Let client know what the running time was when the stream had to be
771          * killed  */
772         pa_usec_t time;
773         if (pa_stream_get_time(s, &time) == 0)
774             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) time);
775     }
776
777     if (s->event_callback)
778         s->event_callback(s, event, pl, s->event_userdata);
779
780 finish:
781     pa_context_unref(c);
782
783     if (pl)
784         pa_proplist_free(pl);
785 }
786
787 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
788     pa_stream *s;
789     pa_context *c = userdata;
790     uint32_t bytes, channel;
791
792     pa_assert(pd);
793     pa_assert(command == PA_COMMAND_REQUEST);
794     pa_assert(t);
795     pa_assert(c);
796     pa_assert(PA_REFCNT_VALUE(c) >= 1);
797
798     pa_context_ref(c);
799
800     if (pa_tagstruct_getu32(t, &channel) < 0 ||
801         pa_tagstruct_getu32(t, &bytes) < 0 ||
802         !pa_tagstruct_eof(t)) {
803         pa_context_fail(c, PA_ERR_PROTOCOL);
804         goto finish;
805     }
806
807     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
808         goto finish;
809
810     if (s->state != PA_STREAM_READY)
811         goto finish;
812
813     s->requested_bytes += bytes;
814
815     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
816
817     if (s->requested_bytes > 0 && s->write_callback)
818         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
819
820 finish:
821     pa_context_unref(c);
822 }
823
824 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
825     pa_stream *s;
826     pa_context *c = userdata;
827     uint32_t channel;
828
829     pa_assert(pd);
830     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
831     pa_assert(t);
832     pa_assert(c);
833     pa_assert(PA_REFCNT_VALUE(c) >= 1);
834
835     pa_context_ref(c);
836
837     if (pa_tagstruct_getu32(t, &channel) < 0 ||
838         !pa_tagstruct_eof(t)) {
839         pa_context_fail(c, PA_ERR_PROTOCOL);
840         goto finish;
841     }
842
843     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
844         goto finish;
845
846     if (s->state != PA_STREAM_READY)
847         goto finish;
848
849     if (s->buffer_attr.prebuf > 0)
850         check_smoother_status(s, TRUE, FALSE, TRUE);
851
852     request_auto_timing_update(s, TRUE);
853
854     if (command == PA_COMMAND_OVERFLOW) {
855         if (s->overflow_callback)
856             s->overflow_callback(s, s->overflow_userdata);
857     } else if (command == PA_COMMAND_UNDERFLOW) {
858         if (s->underflow_callback)
859             s->underflow_callback(s, s->underflow_userdata);
860     }
861
862 finish:
863     pa_context_unref(c);
864 }
865
866 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
867     pa_assert(s);
868     pa_assert(PA_REFCNT_VALUE(s) >= 1);
869
870 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
871
872     if (s->state != PA_STREAM_READY)
873         return;
874
875     if (w) {
876         s->write_index_not_before = s->context->ctag;
877
878         if (s->timing_info_valid)
879             s->timing_info.write_index_corrupt = TRUE;
880
881 /*         pa_log("write_index invalidated"); */
882     }
883
884     if (r) {
885         s->read_index_not_before = s->context->ctag;
886
887         if (s->timing_info_valid)
888             s->timing_info.read_index_corrupt = TRUE;
889
890 /*         pa_log("read_index invalidated"); */
891     }
892
893     request_auto_timing_update(s, TRUE);
894 }
895
896 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
897     pa_stream *s = userdata;
898
899     pa_assert(s);
900     pa_assert(PA_REFCNT_VALUE(s) >= 1);
901
902     pa_stream_ref(s);
903     request_auto_timing_update(s, FALSE);
904     pa_stream_unref(s);
905 }
906
907 static void create_stream_complete(pa_stream *s) {
908     pa_assert(s);
909     pa_assert(PA_REFCNT_VALUE(s) >= 1);
910     pa_assert(s->state == PA_STREAM_CREATING);
911
912     pa_stream_set_state(s, PA_STREAM_READY);
913
914     if (s->requested_bytes > 0 && s->write_callback)
915         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
916
917     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
918         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
919         pa_assert(!s->auto_timing_update_event);
920         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
921
922         request_auto_timing_update(s, TRUE);
923     }
924
925     check_smoother_status(s, TRUE, FALSE, FALSE);
926 }
927
928 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
929     const char *e;
930
931     pa_assert(s);
932     pa_assert(attr);
933
934     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
935         uint32_t ms;
936
937         if (pa_atou(e, &ms) < 0 || ms <= 0)
938             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
939         else {
940             attr->maxlength = (uint32_t) -1;
941             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
942             attr->minreq = (uint32_t) -1;
943             attr->prebuf = (uint32_t) -1;
944             attr->fragsize = attr->tlength;
945         }
946
947         if (flags)
948             *flags |= PA_STREAM_ADJUST_LATENCY;
949     }
950
951     if (s->context->version >= 13)
952         return;
953
954     /* Version older than 0.9.10 didn't do server side buffer_attr
955      * selection, hence we have to fake it on the client side. */
956
957     /* We choose fairly conservative values here, to not confuse
958      * old clients with extremely large playback buffers */
959
960     if (attr->maxlength == (uint32_t) -1)
961         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
962
963     if (attr->tlength == (uint32_t) -1)
964         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
965
966     if (attr->minreq == (uint32_t) -1)
967         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
968
969     if (attr->prebuf == (uint32_t) -1)
970         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
971
972     if (attr->fragsize == (uint32_t) -1)
973         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
974 }
975
976 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
977     pa_stream *s = userdata;
978     uint32_t requested_bytes = 0;
979
980     pa_assert(pd);
981     pa_assert(s);
982     pa_assert(PA_REFCNT_VALUE(s) >= 1);
983     pa_assert(s->state == PA_STREAM_CREATING);
984
985     pa_stream_ref(s);
986
987     if (command != PA_COMMAND_REPLY) {
988         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
989             goto finish;
990
991         pa_stream_set_state(s, PA_STREAM_FAILED);
992         goto finish;
993     }
994
995     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
996         s->channel == PA_INVALID_INDEX ||
997         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
998         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
999         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1000         goto finish;
1001     }
1002
1003     s->requested_bytes = (int64_t) requested_bytes;
1004
1005     if (s->context->version >= 9) {
1006         if (s->direction == PA_STREAM_PLAYBACK) {
1007             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1008                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1009                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1010                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1011                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1012                 goto finish;
1013             }
1014         } else if (s->direction == PA_STREAM_RECORD) {
1015             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1016                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1017                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1018                 goto finish;
1019             }
1020         }
1021     }
1022
1023     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1024         pa_sample_spec ss;
1025         pa_channel_map cm;
1026         const char *dn = NULL;
1027         pa_bool_t suspended;
1028
1029         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1030             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1031             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1032             pa_tagstruct_gets(t, &dn) < 0 ||
1033             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1034             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1035             goto finish;
1036         }
1037
1038         if (!dn || s->device_index == PA_INVALID_INDEX ||
1039             ss.channels != cm.channels ||
1040             !pa_channel_map_valid(&cm) ||
1041             !pa_sample_spec_valid(&ss) ||
1042             (s->n_formats == 0 && (
1043                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1044                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1045                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1046             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1047             goto finish;
1048         }
1049
1050         pa_xfree(s->device_name);
1051         s->device_name = pa_xstrdup(dn);
1052         s->suspended = suspended;
1053
1054         s->channel_map = cm;
1055         s->sample_spec = ss;
1056     }
1057
1058     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1059         pa_usec_t usec;
1060
1061         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1062             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1063             goto finish;
1064         }
1065
1066         if (s->direction == PA_STREAM_RECORD)
1067             s->timing_info.configured_source_usec = usec;
1068         else
1069             s->timing_info.configured_sink_usec = usec;
1070     }
1071
1072     if (s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) {
1073         pa_format_info *f = pa_format_info_new();
1074         pa_tagstruct_get_format_info(t, f);
1075
1076         if (pa_format_info_valid(f))
1077             s->format = f;
1078         else {
1079             pa_format_info_free(f);
1080             if (s->n_formats > 0) {
1081                 /* We used the extended API, so we should have got back a proper format */
1082                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1083                 goto finish;
1084             }
1085         }
1086     }
1087
1088     if (!pa_tagstruct_eof(t)) {
1089         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1090         goto finish;
1091     }
1092
1093     if (s->direction == PA_STREAM_RECORD) {
1094         pa_assert(!s->record_memblockq);
1095
1096         s->record_memblockq = pa_memblockq_new(
1097                 0,
1098                 s->buffer_attr.maxlength,
1099                 0,
1100                 pa_frame_size(&s->sample_spec),
1101                 1,
1102                 0,
1103                 0,
1104                 NULL);
1105     }
1106
1107     s->channel_valid = TRUE;
1108     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1109
1110     create_stream_complete(s);
1111
1112 finish:
1113     pa_stream_unref(s);
1114 }
1115
1116 static int create_stream(
1117         pa_stream_direction_t direction,
1118         pa_stream *s,
1119         const char *dev,
1120         const pa_buffer_attr *attr,
1121         pa_stream_flags_t flags,
1122         const pa_cvolume *volume,
1123         pa_stream *sync_stream) {
1124
1125     pa_tagstruct *t;
1126     uint32_t tag;
1127     pa_bool_t volume_set = FALSE;
1128     uint32_t i;
1129
1130     pa_assert(s);
1131     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1132     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1133
1134     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1135     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1136     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1137     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1138                                               PA_STREAM_INTERPOLATE_TIMING|
1139                                               PA_STREAM_NOT_MONOTONIC|
1140                                               PA_STREAM_AUTO_TIMING_UPDATE|
1141                                               PA_STREAM_NO_REMAP_CHANNELS|
1142                                               PA_STREAM_NO_REMIX_CHANNELS|
1143                                               PA_STREAM_FIX_FORMAT|
1144                                               PA_STREAM_FIX_RATE|
1145                                               PA_STREAM_FIX_CHANNELS|
1146                                               PA_STREAM_DONT_MOVE|
1147                                               PA_STREAM_VARIABLE_RATE|
1148                                               PA_STREAM_PEAK_DETECT|
1149                                               PA_STREAM_START_MUTED|
1150                                               PA_STREAM_ADJUST_LATENCY|
1151                                               PA_STREAM_EARLY_REQUESTS|
1152                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1153                                               PA_STREAM_START_UNMUTED|
1154                                               PA_STREAM_FAIL_ON_SUSPEND|
1155                                               PA_STREAM_RELATIVE_VOLUME|
1156                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1157
1158
1159     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1160     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1161     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1162     /* Althought some of the other flags are not supported on older
1163      * version, we don't check for them here, because it doesn't hurt
1164      * when they are passed but actually not supported. This makes
1165      * client development easier */
1166
1167     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1168     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1169     PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1170     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1171     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1172
1173     pa_stream_ref(s);
1174
1175     s->direction = direction;
1176
1177     if (sync_stream)
1178         s->syncid = sync_stream->syncid;
1179
1180     if (attr)
1181         s->buffer_attr = *attr;
1182     patch_buffer_attr(s, &s->buffer_attr, &flags);
1183
1184     s->flags = flags;
1185     s->corked = !!(flags & PA_STREAM_START_CORKED);
1186
1187     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1188         pa_usec_t x;
1189
1190         x = pa_rtclock_now();
1191
1192         pa_assert(!s->smoother);
1193         s->smoother = pa_smoother_new(
1194                 SMOOTHER_ADJUST_TIME,
1195                 SMOOTHER_HISTORY_TIME,
1196                 !(flags & PA_STREAM_NOT_MONOTONIC),
1197                 TRUE,
1198                 SMOOTHER_MIN_HISTORY,
1199                 x,
1200                 TRUE);
1201     }
1202
1203     if (!dev)
1204         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1205
1206     t = pa_tagstruct_command(
1207             s->context,
1208             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1209             &tag);
1210
1211     if (s->context->version < 13)
1212         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1213
1214     pa_tagstruct_put(
1215             t,
1216             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1217             PA_TAG_CHANNEL_MAP, &s->channel_map,
1218             PA_TAG_U32, PA_INVALID_INDEX,
1219             PA_TAG_STRING, dev,
1220             PA_TAG_U32, s->buffer_attr.maxlength,
1221             PA_TAG_BOOLEAN, s->corked,
1222             PA_TAG_INVALID);
1223
1224     if (s->direction == PA_STREAM_PLAYBACK) {
1225         pa_cvolume cv;
1226
1227         pa_tagstruct_put(
1228                 t,
1229                 PA_TAG_U32, s->buffer_attr.tlength,
1230                 PA_TAG_U32, s->buffer_attr.prebuf,
1231                 PA_TAG_U32, s->buffer_attr.minreq,
1232                 PA_TAG_U32, s->syncid,
1233                 PA_TAG_INVALID);
1234
1235         volume_set = !!volume;
1236
1237         if (!volume) {
1238             if (pa_sample_spec_valid(&s->sample_spec))
1239                 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1240             else {
1241                 /* This is not really relevant, since no volume was set, and
1242                  * the real number of channels is embedded in the format_info
1243                  * structure */
1244                 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1245             }
1246         }
1247
1248         pa_tagstruct_put_cvolume(t, volume);
1249     } else
1250         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1251
1252     if (s->context->version >= 12) {
1253         pa_tagstruct_put(
1254                 t,
1255                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1256                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1257                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1258                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1259                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1260                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1261                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1262                 PA_TAG_INVALID);
1263     }
1264
1265     if (s->context->version >= 13) {
1266
1267         if (s->direction == PA_STREAM_PLAYBACK)
1268             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1269         else
1270             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1271
1272         pa_tagstruct_put(
1273                 t,
1274                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1275                 PA_TAG_PROPLIST, s->proplist,
1276                 PA_TAG_INVALID);
1277
1278         if (s->direction == PA_STREAM_RECORD)
1279             pa_tagstruct_putu32(t, s->direct_on_input);
1280     }
1281
1282     if (s->context->version >= 14) {
1283
1284         if (s->direction == PA_STREAM_PLAYBACK)
1285             pa_tagstruct_put_boolean(t, volume_set);
1286
1287         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1288     }
1289
1290     if (s->context->version >= 15) {
1291
1292         if (s->direction == PA_STREAM_PLAYBACK)
1293             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1294
1295         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1296         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1297     }
1298
1299     if (s->context->version >= 17) {
1300
1301         if (s->direction == PA_STREAM_PLAYBACK)
1302             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1303
1304     }
1305
1306     if (s->context->version >= 18) {
1307
1308         if (s->direction == PA_STREAM_PLAYBACK)
1309             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1310     }
1311
1312     if (s->context->version >= 21) {
1313
1314         if (s->direction == PA_STREAM_PLAYBACK) {
1315             pa_tagstruct_putu8(t, s->n_formats);
1316             for (i = 0; i < s->n_formats; i++)
1317                 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1318         }
1319     }
1320
1321     pa_pstream_send_tagstruct(s->context->pstream, t);
1322     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1323
1324     pa_stream_set_state(s, PA_STREAM_CREATING);
1325
1326     pa_stream_unref(s);
1327     return 0;
1328 }
1329
1330 int pa_stream_connect_playback(
1331         pa_stream *s,
1332         const char *dev,
1333         const pa_buffer_attr *attr,
1334         pa_stream_flags_t flags,
1335         const pa_cvolume *volume,
1336         pa_stream *sync_stream) {
1337
1338     pa_assert(s);
1339     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1340
1341     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1342 }
1343
1344 int pa_stream_connect_record(
1345         pa_stream *s,
1346         const char *dev,
1347         const pa_buffer_attr *attr,
1348         pa_stream_flags_t flags) {
1349
1350     pa_assert(s);
1351     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1352
1353     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1354 }
1355
1356 int pa_stream_begin_write(
1357         pa_stream *s,
1358         void **data,
1359         size_t *nbytes) {
1360
1361     pa_assert(s);
1362     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1363
1364     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1365     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1366     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1367     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1368     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1369
1370     if (*nbytes != (size_t) -1) {
1371         size_t m, fs;
1372
1373         m = pa_mempool_block_size_max(s->context->mempool);
1374         fs = pa_frame_size(&s->sample_spec);
1375
1376         m = (m / fs) * fs;
1377         if (*nbytes > m)
1378             *nbytes = m;
1379     }
1380
1381     if (!s->write_memblock) {
1382         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1383         s->write_data = pa_memblock_acquire(s->write_memblock);
1384     }
1385
1386     *data = s->write_data;
1387     *nbytes = pa_memblock_get_length(s->write_memblock);
1388
1389     return 0;
1390 }
1391
1392 int pa_stream_cancel_write(
1393         pa_stream *s) {
1394
1395     pa_assert(s);
1396     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1397
1398     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1399     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1400     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1401     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1402
1403     pa_assert(s->write_data);
1404
1405     pa_memblock_release(s->write_memblock);
1406     pa_memblock_unref(s->write_memblock);
1407     s->write_memblock = NULL;
1408     s->write_data = NULL;
1409
1410     return 0;
1411 }
1412
1413 int pa_stream_write(
1414         pa_stream *s,
1415         const void *data,
1416         size_t length,
1417         pa_free_cb_t free_cb,
1418         int64_t offset,
1419         pa_seek_mode_t seek) {
1420
1421     pa_assert(s);
1422     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1423     pa_assert(data);
1424
1425     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1426     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1427     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1428     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1429     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1430     PA_CHECK_VALIDITY(s->context,
1431                       !s->write_memblock ||
1432                       ((data >= s->write_data) &&
1433                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1434                       PA_ERR_INVALID);
1435     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1436
1437     if (s->write_memblock) {
1438         pa_memchunk chunk;
1439
1440         /* pa_stream_write_begin() was called before */
1441
1442         pa_memblock_release(s->write_memblock);
1443
1444         chunk.memblock = s->write_memblock;
1445         chunk.index = (const char *) data - (const char *) s->write_data;
1446         chunk.length = length;
1447
1448         s->write_memblock = NULL;
1449         s->write_data = NULL;
1450
1451         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1452         pa_memblock_unref(chunk.memblock);
1453
1454     } else {
1455         pa_seek_mode_t t_seek = seek;
1456         int64_t t_offset = offset;
1457         size_t t_length = length;
1458         const void *t_data = data;
1459
1460         /* pa_stream_write_begin() was not called before */
1461
1462         while (t_length > 0) {
1463             pa_memchunk chunk;
1464
1465             chunk.index = 0;
1466
1467             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1468                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1469                 chunk.length = t_length;
1470             } else {
1471                 void *d;
1472
1473                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1474                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1475
1476                 d = pa_memblock_acquire(chunk.memblock);
1477                 memcpy(d, t_data, chunk.length);
1478                 pa_memblock_release(chunk.memblock);
1479             }
1480
1481             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1482
1483             t_offset = 0;
1484             t_seek = PA_SEEK_RELATIVE;
1485
1486             t_data = (const uint8_t*) t_data + chunk.length;
1487             t_length -= chunk.length;
1488
1489             pa_memblock_unref(chunk.memblock);
1490         }
1491
1492         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1493             free_cb((void*) data);
1494     }
1495
1496     /* This is obviously wrong since we ignore the seeking index . But
1497      * that's OK, the server side applies the same error */
1498     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1499
1500     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1501
1502     if (s->direction == PA_STREAM_PLAYBACK) {
1503
1504         /* Update latency request correction */
1505         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1506
1507             if (seek == PA_SEEK_ABSOLUTE) {
1508                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1509                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1510                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1511             } else if (seek == PA_SEEK_RELATIVE) {
1512                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1513                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1514             } else
1515                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1516         }
1517
1518         /* Update the write index in the already available latency data */
1519         if (s->timing_info_valid) {
1520
1521             if (seek == PA_SEEK_ABSOLUTE) {
1522                 s->timing_info.write_index_corrupt = FALSE;
1523                 s->timing_info.write_index = offset + (int64_t) length;
1524             } else if (seek == PA_SEEK_RELATIVE) {
1525                 if (!s->timing_info.write_index_corrupt)
1526                     s->timing_info.write_index += offset + (int64_t) length;
1527             } else
1528                 s->timing_info.write_index_corrupt = TRUE;
1529         }
1530
1531         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1532             request_auto_timing_update(s, TRUE);
1533     }
1534
1535     return 0;
1536 }
1537
1538 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1539     pa_assert(s);
1540     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1541     pa_assert(data);
1542     pa_assert(length);
1543
1544     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1545     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1546     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1547
1548     if (!s->peek_memchunk.memblock) {
1549
1550         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1551             *data = NULL;
1552             *length = 0;
1553             return 0;
1554         }
1555
1556         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1557     }
1558
1559     pa_assert(s->peek_data);
1560     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1561     *length = s->peek_memchunk.length;
1562     return 0;
1563 }
1564
1565 int pa_stream_drop(pa_stream *s) {
1566     pa_assert(s);
1567     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1568
1569     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1570     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1571     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1572     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1573
1574     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1575
1576     /* Fix the simulated local read index */
1577     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1578         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1579
1580     pa_assert(s->peek_data);
1581     pa_memblock_release(s->peek_memchunk.memblock);
1582     pa_memblock_unref(s->peek_memchunk.memblock);
1583     pa_memchunk_reset(&s->peek_memchunk);
1584
1585     return 0;
1586 }
1587
1588 size_t pa_stream_writable_size(pa_stream *s) {
1589     pa_assert(s);
1590     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1591
1592     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1593     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1594     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1595
1596     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1597 }
1598
1599 size_t pa_stream_readable_size(pa_stream *s) {
1600     pa_assert(s);
1601     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1602
1603     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1604     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1605     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1606
1607     return pa_memblockq_get_length(s->record_memblockq);
1608 }
1609
1610 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1611     pa_operation *o;
1612     pa_tagstruct *t;
1613     uint32_t tag;
1614
1615     pa_assert(s);
1616     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1617
1618     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1619     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1620     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1621
1622     /* Ask for a timing update before we cork/uncork to get the best
1623      * accuracy for the transport latency suitable for the
1624      * check_smoother_status() call in the started callback */
1625     request_auto_timing_update(s, TRUE);
1626
1627     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1628
1629     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1630     pa_tagstruct_putu32(t, s->channel);
1631     pa_pstream_send_tagstruct(s->context->pstream, t);
1632     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1633
1634     /* This might cause the read index to conitnue again, hence
1635      * let's request a timing update */
1636     request_auto_timing_update(s, TRUE);
1637
1638     return o;
1639 }
1640
1641 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1642     pa_usec_t usec;
1643
1644     pa_assert(s);
1645     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1646     pa_assert(s->state == PA_STREAM_READY);
1647     pa_assert(s->direction != PA_STREAM_UPLOAD);
1648     pa_assert(s->timing_info_valid);
1649     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1650     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1651
1652     if (s->direction == PA_STREAM_PLAYBACK) {
1653         /* The last byte that was written into the output device
1654          * had this time value associated */
1655         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1656
1657         if (!s->corked && !s->suspended) {
1658
1659             if (!ignore_transport)
1660                 /* Because the latency info took a little time to come
1661                  * to us, we assume that the real output time is actually
1662                  * a little ahead */
1663                 usec += s->timing_info.transport_usec;
1664
1665             /* However, the output device usually maintains a buffer
1666                too, hence the real sample currently played is a little
1667                back  */
1668             if (s->timing_info.sink_usec >= usec)
1669                 usec = 0;
1670             else
1671                 usec -= s->timing_info.sink_usec;
1672         }
1673
1674     } else {
1675         pa_assert(s->direction == PA_STREAM_RECORD);
1676
1677         /* The last byte written into the server side queue had
1678          * this time value associated */
1679         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1680
1681         if (!s->corked && !s->suspended) {
1682
1683             if (!ignore_transport)
1684                 /* Add transport latency */
1685                 usec += s->timing_info.transport_usec;
1686
1687             /* Add latency of data in device buffer */
1688             usec += s->timing_info.source_usec;
1689
1690             /* If this is a monitor source, we need to correct the
1691              * time by the playback device buffer */
1692             if (s->timing_info.sink_usec >= usec)
1693                 usec = 0;
1694             else
1695                 usec -= s->timing_info.sink_usec;
1696         }
1697     }
1698
1699     return usec;
1700 }
1701
1702 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1703     pa_operation *o = userdata;
1704     struct timeval local, remote, now;
1705     pa_timing_info *i;
1706     pa_bool_t playing = FALSE;
1707     uint64_t underrun_for = 0, playing_for = 0;
1708
1709     pa_assert(pd);
1710     pa_assert(o);
1711     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1712
1713     if (!o->context || !o->stream)
1714         goto finish;
1715
1716     i = &o->stream->timing_info;
1717
1718     o->stream->timing_info_valid = FALSE;
1719     i->write_index_corrupt = TRUE;
1720     i->read_index_corrupt = TRUE;
1721
1722     if (command != PA_COMMAND_REPLY) {
1723         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1724             goto finish;
1725
1726     } else {
1727
1728         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1729             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1730             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1731             pa_tagstruct_get_timeval(t, &local) < 0 ||
1732             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1733             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1734             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1735
1736             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1737             goto finish;
1738         }
1739
1740         if (o->context->version >= 13 &&
1741             o->stream->direction == PA_STREAM_PLAYBACK)
1742             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1743                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1744
1745                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1746                 goto finish;
1747             }
1748
1749
1750         if (!pa_tagstruct_eof(t)) {
1751             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1752             goto finish;
1753         }
1754         o->stream->timing_info_valid = TRUE;
1755         i->write_index_corrupt = FALSE;
1756         i->read_index_corrupt = FALSE;
1757
1758         i->playing = (int) playing;
1759         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1760
1761         pa_gettimeofday(&now);
1762
1763         /* Calculcate timestamps */
1764         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1765             /* local and remote seem to have synchronized clocks */
1766
1767             if (o->stream->direction == PA_STREAM_PLAYBACK)
1768                 i->transport_usec = pa_timeval_diff(&remote, &local);
1769             else
1770                 i->transport_usec = pa_timeval_diff(&now, &remote);
1771
1772             i->synchronized_clocks = TRUE;
1773             i->timestamp = remote;
1774         } else {
1775             /* clocks are not synchronized, let's estimate latency then */
1776             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1777             i->synchronized_clocks = FALSE;
1778             i->timestamp = local;
1779             pa_timeval_add(&i->timestamp, i->transport_usec);
1780         }
1781
1782         /* Invalidate read and write indexes if necessary */
1783         if (tag < o->stream->read_index_not_before)
1784             i->read_index_corrupt = TRUE;
1785
1786         if (tag < o->stream->write_index_not_before)
1787             i->write_index_corrupt = TRUE;
1788
1789         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1790             /* Write index correction */
1791
1792             int n, j;
1793             uint32_t ctag = tag;
1794
1795             /* Go through the saved correction values and add up the
1796              * total correction.*/
1797             for (n = 0, j = o->stream->current_write_index_correction+1;
1798                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1799                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1800
1801                 /* Step over invalid data or out-of-date data */
1802                 if (!o->stream->write_index_corrections[j].valid ||
1803                     o->stream->write_index_corrections[j].tag < ctag)
1804                     continue;
1805
1806                 /* Make sure that everything is in order */
1807                 ctag = o->stream->write_index_corrections[j].tag+1;
1808
1809                 /* Now fix the write index */
1810                 if (o->stream->write_index_corrections[j].corrupt) {
1811                     /* A corrupting seek was made */
1812                     i->write_index_corrupt = TRUE;
1813                 } else if (o->stream->write_index_corrections[j].absolute) {
1814                     /* An absolute seek was made */
1815                     i->write_index = o->stream->write_index_corrections[j].value;
1816                     i->write_index_corrupt = FALSE;
1817                 } else if (!i->write_index_corrupt) {
1818                     /* A relative seek was made */
1819                     i->write_index += o->stream->write_index_corrections[j].value;
1820                 }
1821             }
1822
1823             /* Clear old correction entries */
1824             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1825                 if (!o->stream->write_index_corrections[n].valid)
1826                     continue;
1827
1828                 if (o->stream->write_index_corrections[n].tag <= tag)
1829                     o->stream->write_index_corrections[n].valid = FALSE;
1830             }
1831         }
1832
1833         if (o->stream->direction == PA_STREAM_RECORD) {
1834             /* Read index correction */
1835
1836             if (!i->read_index_corrupt)
1837                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1838         }
1839
1840         /* Update smoother if we're not corked */
1841         if (o->stream->smoother && !o->stream->corked) {
1842             pa_usec_t u, x;
1843
1844             u = x = pa_rtclock_now() - i->transport_usec;
1845
1846             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1847                 pa_usec_t su;
1848
1849                 /* If we weren't playing then it will take some time
1850                  * until the audio will actually come out through the
1851                  * speakers. Since we follow that timing here, we need
1852                  * to try to fix this up */
1853
1854                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1855
1856                 if (su < i->sink_usec)
1857                     x += i->sink_usec - su;
1858             }
1859
1860             if (!i->playing)
1861                 pa_smoother_pause(o->stream->smoother, x);
1862
1863             /* Update the smoother */
1864             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1865                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1866                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1867
1868             if (i->playing)
1869                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1870         }
1871     }
1872
1873     o->stream->auto_timing_update_requested = FALSE;
1874
1875     if (o->stream->latency_update_callback)
1876         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1877
1878     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1879         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1880         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1881     }
1882
1883 finish:
1884
1885     pa_operation_done(o);
1886     pa_operation_unref(o);
1887 }
1888
1889 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1890     uint32_t tag;
1891     pa_operation *o;
1892     pa_tagstruct *t;
1893     struct timeval now;
1894     int cidx = 0;
1895
1896     pa_assert(s);
1897     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1898
1899     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1900     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1901     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1902
1903     if (s->direction == PA_STREAM_PLAYBACK) {
1904         /* Find a place to store the write_index correction data for this entry */
1905         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1906
1907         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1908         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1909     }
1910     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1911
1912     t = pa_tagstruct_command(
1913             s->context,
1914             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1915             &tag);
1916     pa_tagstruct_putu32(t, s->channel);
1917     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1918
1919     pa_pstream_send_tagstruct(s->context->pstream, t);
1920     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1921
1922     if (s->direction == PA_STREAM_PLAYBACK) {
1923         /* Fill in initial correction data */
1924
1925         s->current_write_index_correction = cidx;
1926
1927         s->write_index_corrections[cidx].valid = TRUE;
1928         s->write_index_corrections[cidx].absolute = FALSE;
1929         s->write_index_corrections[cidx].corrupt = FALSE;
1930         s->write_index_corrections[cidx].tag = tag;
1931         s->write_index_corrections[cidx].value = 0;
1932     }
1933
1934     return o;
1935 }
1936
1937 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1938     pa_stream *s = userdata;
1939
1940     pa_assert(pd);
1941     pa_assert(s);
1942     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1943
1944     pa_stream_ref(s);
1945
1946     if (command != PA_COMMAND_REPLY) {
1947         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1948             goto finish;
1949
1950         pa_stream_set_state(s, PA_STREAM_FAILED);
1951         goto finish;
1952     } else if (!pa_tagstruct_eof(t)) {
1953         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1954         goto finish;
1955     }
1956
1957     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1958
1959 finish:
1960     pa_stream_unref(s);
1961 }
1962
1963 int pa_stream_disconnect(pa_stream *s) {
1964     pa_tagstruct *t;
1965     uint32_t tag;
1966
1967     pa_assert(s);
1968     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1969
1970     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1971     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1972     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1973
1974     pa_stream_ref(s);
1975
1976     t = pa_tagstruct_command(
1977             s->context,
1978             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1979                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1980             &tag);
1981     pa_tagstruct_putu32(t, s->channel);
1982     pa_pstream_send_tagstruct(s->context->pstream, t);
1983     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1984
1985     pa_stream_unref(s);
1986     return 0;
1987 }
1988
1989 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1990     pa_assert(s);
1991     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1992
1993     if (pa_detect_fork())
1994         return;
1995
1996     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1997         return;
1998
1999     s->read_callback = cb;
2000     s->read_userdata = userdata;
2001 }
2002
2003 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2004     pa_assert(s);
2005     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2006
2007     if (pa_detect_fork())
2008         return;
2009
2010     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2011         return;
2012
2013     s->write_callback = cb;
2014     s->write_userdata = userdata;
2015 }
2016
2017 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2018     pa_assert(s);
2019     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2020
2021     if (pa_detect_fork())
2022         return;
2023
2024     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2025         return;
2026
2027     s->state_callback = cb;
2028     s->state_userdata = userdata;
2029 }
2030
2031 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2032     pa_assert(s);
2033     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2034
2035     if (pa_detect_fork())
2036         return;
2037
2038     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2039         return;
2040
2041     s->overflow_callback = cb;
2042     s->overflow_userdata = userdata;
2043 }
2044
2045 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2046     pa_assert(s);
2047     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2048
2049     if (pa_detect_fork())
2050         return;
2051
2052     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2053         return;
2054
2055     s->underflow_callback = cb;
2056     s->underflow_userdata = userdata;
2057 }
2058
2059 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2060     pa_assert(s);
2061     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2062
2063     if (pa_detect_fork())
2064         return;
2065
2066     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2067         return;
2068
2069     s->latency_update_callback = cb;
2070     s->latency_update_userdata = userdata;
2071 }
2072
2073 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2074     pa_assert(s);
2075     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2076
2077     if (pa_detect_fork())
2078         return;
2079
2080     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2081         return;
2082
2083     s->moved_callback = cb;
2084     s->moved_userdata = userdata;
2085 }
2086
2087 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2088     pa_assert(s);
2089     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2090
2091     if (pa_detect_fork())
2092         return;
2093
2094     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2095         return;
2096
2097     s->suspended_callback = cb;
2098     s->suspended_userdata = userdata;
2099 }
2100
2101 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2102     pa_assert(s);
2103     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2104
2105     if (pa_detect_fork())
2106         return;
2107
2108     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2109         return;
2110
2111     s->started_callback = cb;
2112     s->started_userdata = userdata;
2113 }
2114
2115 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2116     pa_assert(s);
2117     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2118
2119     if (pa_detect_fork())
2120         return;
2121
2122     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2123         return;
2124
2125     s->event_callback = cb;
2126     s->event_userdata = userdata;
2127 }
2128
2129 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2130     pa_assert(s);
2131     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2132
2133     if (pa_detect_fork())
2134         return;
2135
2136     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2137         return;
2138
2139     s->buffer_attr_callback = cb;
2140     s->buffer_attr_userdata = userdata;
2141 }
2142
2143 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2144     pa_operation *o = userdata;
2145     int success = 1;
2146
2147     pa_assert(pd);
2148     pa_assert(o);
2149     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2150
2151     if (!o->context)
2152         goto finish;
2153
2154     if (command != PA_COMMAND_REPLY) {
2155         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2156             goto finish;
2157
2158         success = 0;
2159     } else if (!pa_tagstruct_eof(t)) {
2160         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2161         goto finish;
2162     }
2163
2164     if (o->callback) {
2165         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2166         cb(o->stream, success, o->userdata);
2167     }
2168
2169 finish:
2170     pa_operation_done(o);
2171     pa_operation_unref(o);
2172 }
2173
2174 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2175     pa_operation *o;
2176     pa_tagstruct *t;
2177     uint32_t tag;
2178
2179     pa_assert(s);
2180     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2181
2182     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2183     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2184     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2185
2186     /* Ask for a timing update before we cork/uncork to get the best
2187      * accuracy for the transport latency suitable for the
2188      * check_smoother_status() call in the started callback */
2189     request_auto_timing_update(s, TRUE);
2190
2191     s->corked = b;
2192
2193     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2194
2195     t = pa_tagstruct_command(
2196             s->context,
2197             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2198             &tag);
2199     pa_tagstruct_putu32(t, s->channel);
2200     pa_tagstruct_put_boolean(t, !!b);
2201     pa_pstream_send_tagstruct(s->context->pstream, t);
2202     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2203
2204     check_smoother_status(s, FALSE, FALSE, FALSE);
2205
2206     /* This might cause the indexes to hang/start again, hence let's
2207      * request a timing update, after the cork/uncork, too */
2208     request_auto_timing_update(s, TRUE);
2209
2210     return o;
2211 }
2212
2213 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2214     pa_tagstruct *t;
2215     pa_operation *o;
2216     uint32_t tag;
2217
2218     pa_assert(s);
2219     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2220
2221     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2222     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2223
2224     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2225
2226     t = pa_tagstruct_command(s->context, command, &tag);
2227     pa_tagstruct_putu32(t, s->channel);
2228     pa_pstream_send_tagstruct(s->context->pstream, t);
2229     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2230
2231     return o;
2232 }
2233
2234 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2235     pa_operation *o;
2236
2237     pa_assert(s);
2238     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2239
2240     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2241     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2242     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2243
2244     /* Ask for a timing update *before* the flush, so that the
2245      * transport usec is as up to date as possible when we get the
2246      * underflow message and update the smoother status*/
2247     request_auto_timing_update(s, TRUE);
2248
2249     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2250         return NULL;
2251
2252     if (s->direction == PA_STREAM_PLAYBACK) {
2253
2254         if (s->write_index_corrections[s->current_write_index_correction].valid)
2255             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2256
2257         if (s->buffer_attr.prebuf > 0)
2258             check_smoother_status(s, FALSE, FALSE, TRUE);
2259
2260         /* This will change the write index, but leave the
2261          * read index untouched. */
2262         invalidate_indexes(s, FALSE, TRUE);
2263
2264     } else
2265         /* For record streams this has no influence on the write
2266          * index, but the read index might jump. */
2267         invalidate_indexes(s, TRUE, FALSE);
2268
2269     /* Note that we do not update requested_bytes here. This is
2270      * because we cannot really know how data actually was dropped
2271      * from the write index due to this. This 'error' will be applied
2272      * by both client and server and hence we should be fine. */
2273
2274     return o;
2275 }
2276
2277 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2278     pa_operation *o;
2279
2280     pa_assert(s);
2281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2282
2283     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2284     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2285     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2286     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2287
2288     /* Ask for a timing update before we cork/uncork to get the best
2289      * accuracy for the transport latency suitable for the
2290      * check_smoother_status() call in the started callback */
2291     request_auto_timing_update(s, TRUE);
2292
2293     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2294         return NULL;
2295
2296     /* This might cause the read index to hang again, hence
2297      * let's request a timing update */
2298     request_auto_timing_update(s, TRUE);
2299
2300     return o;
2301 }
2302
2303 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2304     pa_operation *o;
2305
2306     pa_assert(s);
2307     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2308
2309     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2310     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2311     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2312     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2313
2314     /* Ask for a timing update before we cork/uncork to get the best
2315      * accuracy for the transport latency suitable for the
2316      * check_smoother_status() call in the started callback */
2317     request_auto_timing_update(s, TRUE);
2318
2319     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2320         return NULL;
2321
2322     /* This might cause the read index to start moving again, hence
2323      * let's request a timing update */
2324     request_auto_timing_update(s, TRUE);
2325
2326     return o;
2327 }
2328
2329 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2330     pa_operation *o;
2331
2332     pa_assert(s);
2333     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2334     pa_assert(name);
2335
2336     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2337     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2338     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2339
2340     if (s->context->version >= 13) {
2341         pa_proplist *p = pa_proplist_new();
2342
2343         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2344         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2345         pa_proplist_free(p);
2346     } else {
2347         pa_tagstruct *t;
2348         uint32_t tag;
2349
2350         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2351         t = pa_tagstruct_command(
2352                 s->context,
2353                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2354                 &tag);
2355         pa_tagstruct_putu32(t, s->channel);
2356         pa_tagstruct_puts(t, name);
2357         pa_pstream_send_tagstruct(s->context->pstream, t);
2358         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2359     }
2360
2361     return o;
2362 }
2363
2364 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2365     pa_usec_t usec;
2366
2367     pa_assert(s);
2368     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2369
2370     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2371     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2372     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2373     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2374     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2375     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2376
2377     if (s->smoother)
2378         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2379     else
2380         usec = calc_time(s, FALSE);
2381
2382     /* Make sure the time runs monotonically */
2383     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2384         if (usec < s->previous_time)
2385             usec = s->previous_time;
2386         else
2387             s->previous_time = usec;
2388     }
2389
2390     if (r_usec)
2391         *r_usec = usec;
2392
2393     return 0;
2394 }
2395
2396 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2397     pa_assert(s);
2398     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2399
2400     if (negative)
2401         *negative = 0;
2402
2403     if (a >= b)
2404         return a-b;
2405     else {
2406         if (negative && s->direction == PA_STREAM_RECORD) {
2407             *negative = 1;
2408             return b-a;
2409         } else
2410             return 0;
2411     }
2412 }
2413
2414 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2415     pa_usec_t t, c;
2416     int r;
2417     int64_t cindex;
2418
2419     pa_assert(s);
2420     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2421     pa_assert(r_usec);
2422
2423     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2424     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2425     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2426     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2427     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2428     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2429
2430     if ((r = pa_stream_get_time(s, &t)) < 0)
2431         return r;
2432
2433     if (s->direction == PA_STREAM_PLAYBACK)
2434         cindex = s->timing_info.write_index;
2435     else
2436         cindex = s->timing_info.read_index;
2437
2438     if (cindex < 0)
2439         cindex = 0;
2440
2441     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2442
2443     if (s->direction == PA_STREAM_PLAYBACK)
2444         *r_usec = time_counter_diff(s, c, t, negative);
2445     else
2446         *r_usec = time_counter_diff(s, t, c, negative);
2447
2448     return 0;
2449 }
2450
2451 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2452     pa_assert(s);
2453     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2454
2455     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2456     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2457     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2458     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2459
2460     return &s->timing_info;
2461 }
2462
2463 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2464     pa_assert(s);
2465     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2466
2467     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2468
2469     return &s->sample_spec;
2470 }
2471
2472 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2473     pa_assert(s);
2474     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2475
2476     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2477
2478     return &s->channel_map;
2479 }
2480
2481 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2482     pa_assert(s);
2483     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2484
2485     /* We don't have the format till routing is done */
2486     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2487     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2488
2489     return s->format;
2490 }
2491 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2492     pa_assert(s);
2493     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2494
2495     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2496     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2497     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2498
2499     return &s->buffer_attr;
2500 }
2501
2502 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2503     pa_operation *o = userdata;
2504     int success = 1;
2505
2506     pa_assert(pd);
2507     pa_assert(o);
2508     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2509
2510     if (!o->context)
2511         goto finish;
2512
2513     if (command != PA_COMMAND_REPLY) {
2514         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2515             goto finish;
2516
2517         success = 0;
2518     } else {
2519         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2520             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2521                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2522                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2523                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2524                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2525                 goto finish;
2526             }
2527         } else if (o->stream->direction == PA_STREAM_RECORD) {
2528             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2529                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2530                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2531                 goto finish;
2532             }
2533         }
2534
2535         if (o->stream->context->version >= 13) {
2536             pa_usec_t usec;
2537
2538             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2539                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2540                 goto finish;
2541             }
2542
2543             if (o->stream->direction == PA_STREAM_RECORD)
2544                 o->stream->timing_info.configured_source_usec = usec;
2545             else
2546                 o->stream->timing_info.configured_sink_usec = usec;
2547         }
2548
2549         if (!pa_tagstruct_eof(t)) {
2550             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2551             goto finish;
2552         }
2553     }
2554
2555     if (o->callback) {
2556         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2557         cb(o->stream, success, o->userdata);
2558     }
2559
2560 finish:
2561     pa_operation_done(o);
2562     pa_operation_unref(o);
2563 }
2564
2565
2566 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2567     pa_operation *o;
2568     pa_tagstruct *t;
2569     uint32_t tag;
2570     pa_buffer_attr copy;
2571
2572     pa_assert(s);
2573     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2574     pa_assert(attr);
2575
2576     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2577     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2578     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2579     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2580
2581     /* Ask for a timing update before we cork/uncork to get the best
2582      * accuracy for the transport latency suitable for the
2583      * check_smoother_status() call in the started callback */
2584     request_auto_timing_update(s, TRUE);
2585
2586     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2587
2588     t = pa_tagstruct_command(
2589             s->context,
2590             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2591             &tag);
2592     pa_tagstruct_putu32(t, s->channel);
2593
2594     copy = *attr;
2595     patch_buffer_attr(s, &copy, NULL);
2596     attr = &copy;
2597
2598     pa_tagstruct_putu32(t, attr->maxlength);
2599
2600     if (s->direction == PA_STREAM_PLAYBACK)
2601         pa_tagstruct_put(
2602                 t,
2603                 PA_TAG_U32, attr->tlength,
2604                 PA_TAG_U32, attr->prebuf,
2605                 PA_TAG_U32, attr->minreq,
2606                 PA_TAG_INVALID);
2607     else
2608         pa_tagstruct_putu32(t, attr->fragsize);
2609
2610     if (s->context->version >= 13)
2611         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2612
2613     if (s->context->version >= 14)
2614         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2615
2616     pa_pstream_send_tagstruct(s->context->pstream, t);
2617     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2618
2619     /* This might cause changes in the read/write indexex, hence let's
2620      * request a timing update */
2621     request_auto_timing_update(s, TRUE);
2622
2623     return o;
2624 }
2625
2626 uint32_t pa_stream_get_device_index(pa_stream *s) {
2627     pa_assert(s);
2628     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2629
2630     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2631     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2632     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2633     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2634     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2635
2636     return s->device_index;
2637 }
2638
2639 const char *pa_stream_get_device_name(pa_stream *s) {
2640     pa_assert(s);
2641     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2642
2643     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2644     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2645     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2646     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2647     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2648
2649     return s->device_name;
2650 }
2651
2652 int pa_stream_is_suspended(pa_stream *s) {
2653     pa_assert(s);
2654     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2655
2656     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2657     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2658     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2659     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2660
2661     return s->suspended;
2662 }
2663
2664 int pa_stream_is_corked(pa_stream *s) {
2665     pa_assert(s);
2666     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2667
2668     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2669     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2670     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2671
2672     return s->corked;
2673 }
2674
2675 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2676     pa_operation *o = userdata;
2677     int success = 1;
2678
2679     pa_assert(pd);
2680     pa_assert(o);
2681     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2682
2683     if (!o->context)
2684         goto finish;
2685
2686     if (command != PA_COMMAND_REPLY) {
2687         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2688             goto finish;
2689
2690         success = 0;
2691     } else {
2692
2693         if (!pa_tagstruct_eof(t)) {
2694             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2695             goto finish;
2696         }
2697     }
2698
2699     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2700     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2701
2702     if (o->callback) {
2703         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2704         cb(o->stream, success, o->userdata);
2705     }
2706
2707 finish:
2708     pa_operation_done(o);
2709     pa_operation_unref(o);
2710 }
2711
2712
2713 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2714     pa_operation *o;
2715     pa_tagstruct *t;
2716     uint32_t tag;
2717
2718     pa_assert(s);
2719     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2720
2721     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2722     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2723     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2724     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2725     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2726     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2727
2728     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2729     o->private = PA_UINT_TO_PTR(rate);
2730
2731     t = pa_tagstruct_command(
2732             s->context,
2733             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2734             &tag);
2735     pa_tagstruct_putu32(t, s->channel);
2736     pa_tagstruct_putu32(t, rate);
2737
2738     pa_pstream_send_tagstruct(s->context->pstream, t);
2739     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2740
2741     return o;
2742 }
2743
2744 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2745     pa_operation *o;
2746     pa_tagstruct *t;
2747     uint32_t tag;
2748
2749     pa_assert(s);
2750     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2751
2752     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2753     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2754     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2755     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2756     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2757
2758     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2759
2760     t = pa_tagstruct_command(
2761             s->context,
2762             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2763             &tag);
2764     pa_tagstruct_putu32(t, s->channel);
2765     pa_tagstruct_putu32(t, (uint32_t) mode);
2766     pa_tagstruct_put_proplist(t, p);
2767
2768     pa_pstream_send_tagstruct(s->context->pstream, t);
2769     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2770
2771     /* Please note that we don't update s->proplist here, because we
2772      * don't export that field */
2773
2774     return o;
2775 }
2776
2777 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2778     pa_operation *o;
2779     pa_tagstruct *t;
2780     uint32_t tag;
2781     const char * const*k;
2782
2783     pa_assert(s);
2784     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2785
2786     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2787     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2788     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2789     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2790     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2791
2792     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2793
2794     t = pa_tagstruct_command(
2795             s->context,
2796             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2797             &tag);
2798     pa_tagstruct_putu32(t, s->channel);
2799
2800     for (k = keys; *k; k++)
2801         pa_tagstruct_puts(t, *k);
2802
2803     pa_tagstruct_puts(t, NULL);
2804
2805     pa_pstream_send_tagstruct(s->context->pstream, t);
2806     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2807
2808     /* Please note that we don't update s->proplist here, because we
2809      * don't export that field */
2810
2811     return o;
2812 }
2813
2814 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2815     pa_assert(s);
2816     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2817
2818     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2819     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2820     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2821     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2822
2823     s->direct_on_input = sink_input_idx;
2824
2825     return 0;
2826 }
2827
2828 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2829     pa_assert(s);
2830     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2831
2832     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2833     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2834     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2835
2836     return s->direct_on_input;
2837 }