Fix two comment typos.
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_format_info * const *formats,
89         unsigned int n_formats,
90         pa_proplist *p) {
91
92     pa_stream *s;
93     unsigned int i;
94
95     pa_assert(c);
96     pa_assert(PA_REFCNT_VALUE(c) >= 1);
97     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98     pa_assert(n_formats < PA_MAX_FORMATS);
99
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     s = pa_xnew(pa_stream, 1);
104     PA_REFCNT_INIT(s);
105     s->context = c;
106     s->mainloop = c->mainloop;
107
108     s->direction = PA_STREAM_NODIRECTION;
109     s->state = PA_STREAM_UNCONNECTED;
110     s->flags = 0;
111
112     if (ss)
113         s->sample_spec = *ss;
114     else
115         s->sample_spec.format = PA_SAMPLE_INVALID;
116
117     if (map)
118         s->channel_map = *map;
119     else
120         pa_channel_map_init(&s->channel_map);
121
122     s->n_formats = 0;
123     if (formats) {
124         s->n_formats = n_formats;
125         for (i = 0; i < n_formats; i++)
126             s->req_formats[i] = pa_format_info_copy(formats[i]);
127     }
128
129     /* We'll get the final negotiated format after connecting */
130     s->format = NULL;
131
132     s->direct_on_input = PA_INVALID_INDEX;
133
134     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135     if (name)
136         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138     s->channel = 0;
139     s->channel_valid = FALSE;
140     s->syncid = c->csyncid++;
141     s->stream_index = PA_INVALID_INDEX;
142
143     s->requested_bytes = 0;
144     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146     /* We initialize der target length here, so that if the user
147      * passes no explicit buffering metrics the default is similar to
148      * what older PA versions provided. */
149
150     s->buffer_attr.maxlength = (uint32_t) -1;
151     if (ss)
152         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153     else {
154         /* FIXME: We assume a worst-case compressed format corresponding to
155          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156         pa_sample_spec tmp_ss = {
157             .format   = PA_SAMPLE_S16NE,
158             .rate     = 48000,
159             .channels = 2,
160         };
161         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162     }
163     s->buffer_attr.minreq = (uint32_t) -1;
164     s->buffer_attr.prebuf = (uint32_t) -1;
165     s->buffer_attr.fragsize = (uint32_t) -1;
166
167     s->device_index = PA_INVALID_INDEX;
168     s->device_name = NULL;
169     s->suspended = FALSE;
170     s->corked = FALSE;
171
172     s->write_memblock = NULL;
173     s->write_data = NULL;
174
175     pa_memchunk_reset(&s->peek_memchunk);
176     s->peek_data = NULL;
177     s->record_memblockq = NULL;
178
179     memset(&s->timing_info, 0, sizeof(s->timing_info));
180     s->timing_info_valid = FALSE;
181
182     s->previous_time = 0;
183
184     s->read_index_not_before = 0;
185     s->write_index_not_before = 0;
186     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
187         s->write_index_corrections[i].valid = 0;
188     s->current_write_index_correction = 0;
189
190     s->auto_timing_update_event = NULL;
191     s->auto_timing_update_requested = FALSE;
192     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
193
194     reset_callbacks(s);
195
196     s->smoother = NULL;
197
198     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
199     PA_LLIST_PREPEND(pa_stream, c->streams, s);
200     pa_stream_ref(s);
201
202     return s;
203 }
204
205 pa_stream *pa_stream_new_with_proplist(
206         pa_context *c,
207         const char *name,
208         const pa_sample_spec *ss,
209         const pa_channel_map *map,
210         pa_proplist *p) {
211
212     pa_channel_map tmap;
213
214     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
215     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
216     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
217     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
218     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
219
220     if (!map)
221         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
222
223     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
224 }
225
226 pa_stream *pa_stream_new_extended(
227         pa_context *c,
228         const char *name,
229         pa_format_info * const *formats,
230         unsigned int n_formats,
231         pa_proplist *p) {
232
233     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
234
235     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
236 }
237
238 static void stream_unlink(pa_stream *s) {
239     pa_operation *o, *n;
240     pa_assert(s);
241
242     if (!s->context)
243         return;
244
245     /* Detach from context */
246
247     /* Unref all operatio object that point to us */
248     for (o = s->context->operations; o; o = n) {
249         n = o->next;
250
251         if (o->stream == s)
252             pa_operation_cancel(o);
253     }
254
255     /* Drop all outstanding replies for this stream */
256     if (s->context->pdispatch)
257         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
258
259     if (s->channel_valid) {
260         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
261         s->channel = 0;
262         s->channel_valid = FALSE;
263     }
264
265     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
266     pa_stream_unref(s);
267
268     s->context = NULL;
269
270     if (s->auto_timing_update_event) {
271         pa_assert(s->mainloop);
272         s->mainloop->time_free(s->auto_timing_update_event);
273     }
274
275     reset_callbacks(s);
276 }
277
278 static void stream_free(pa_stream *s) {
279     unsigned int i;
280
281     pa_assert(s);
282
283     stream_unlink(s);
284
285     if (s->write_memblock) {
286         pa_memblock_release(s->write_memblock);
287         pa_memblock_unref(s->write_data);
288     }
289
290     if (s->peek_memchunk.memblock) {
291         if (s->peek_data)
292             pa_memblock_release(s->peek_memchunk.memblock);
293         pa_memblock_unref(s->peek_memchunk.memblock);
294     }
295
296     if (s->record_memblockq)
297         pa_memblockq_free(s->record_memblockq);
298
299     if (s->proplist)
300         pa_proplist_free(s->proplist);
301
302     if (s->smoother)
303         pa_smoother_free(s->smoother);
304
305     for (i = 0; i < s->n_formats; i++)
306         pa_format_info_free(s->req_formats[i]);
307
308     if (s->format)
309         pa_format_info_free(s->format);
310
311     pa_xfree(s->device_name);
312     pa_xfree(s);
313 }
314
315 void pa_stream_unref(pa_stream *s) {
316     pa_assert(s);
317     pa_assert(PA_REFCNT_VALUE(s) >= 1);
318
319     if (PA_REFCNT_DEC(s) <= 0)
320         stream_free(s);
321 }
322
323 pa_stream* pa_stream_ref(pa_stream *s) {
324     pa_assert(s);
325     pa_assert(PA_REFCNT_VALUE(s) >= 1);
326
327     PA_REFCNT_INC(s);
328     return s;
329 }
330
331 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
332     pa_assert(s);
333     pa_assert(PA_REFCNT_VALUE(s) >= 1);
334
335     return s->state;
336 }
337
338 pa_context* pa_stream_get_context(pa_stream *s) {
339     pa_assert(s);
340     pa_assert(PA_REFCNT_VALUE(s) >= 1);
341
342     return s->context;
343 }
344
345 uint32_t pa_stream_get_index(pa_stream *s) {
346     pa_assert(s);
347     pa_assert(PA_REFCNT_VALUE(s) >= 1);
348
349     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
350     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
351
352     return s->stream_index;
353 }
354
355 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
356     pa_assert(s);
357     pa_assert(PA_REFCNT_VALUE(s) >= 1);
358
359     if (s->state == st)
360         return;
361
362     pa_stream_ref(s);
363
364     s->state = st;
365
366     if (s->state_callback)
367         s->state_callback(s, s->state_userdata);
368
369     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
370         stream_unlink(s);
371
372     pa_stream_unref(s);
373 }
374
375 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
376     pa_assert(s);
377     pa_assert(PA_REFCNT_VALUE(s) >= 1);
378
379     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
380         return;
381
382     if (s->state == PA_STREAM_READY &&
383         (force || !s->auto_timing_update_requested)) {
384         pa_operation *o;
385
386 /*         pa_log("Automatically requesting new timing data"); */
387
388         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
389             pa_operation_unref(o);
390             s->auto_timing_update_requested = TRUE;
391         }
392     }
393
394     if (s->auto_timing_update_event) {
395         if (s->suspended && !force) {
396             pa_assert(s->mainloop);
397             s->mainloop->time_free(s->auto_timing_update_event);
398             s->auto_timing_update_event = NULL;
399         } else {
400             if (force)
401                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
402
403             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
404
405             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
406         }
407     }
408 }
409
410 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
411     pa_context *c = userdata;
412     pa_stream *s;
413     uint32_t channel;
414
415     pa_assert(pd);
416     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
417     pa_assert(t);
418     pa_assert(c);
419     pa_assert(PA_REFCNT_VALUE(c) >= 1);
420
421     pa_context_ref(c);
422
423     if (pa_tagstruct_getu32(t, &channel) < 0 ||
424         !pa_tagstruct_eof(t)) {
425         pa_context_fail(c, PA_ERR_PROTOCOL);
426         goto finish;
427     }
428
429     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
430         goto finish;
431
432     if (s->state != PA_STREAM_READY)
433         goto finish;
434
435     pa_context_set_error(c, PA_ERR_KILLED);
436     pa_stream_set_state(s, PA_STREAM_FAILED);
437
438 finish:
439     pa_context_unref(c);
440 }
441
442 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
443     pa_usec_t x;
444
445     pa_assert(s);
446     pa_assert(!force_start || !force_stop);
447
448     if (!s->smoother)
449         return;
450
451     x = pa_rtclock_now();
452
453     if (s->timing_info_valid) {
454         if (aposteriori)
455             x -= s->timing_info.transport_usec;
456         else
457             x += s->timing_info.transport_usec;
458     }
459
460     if (s->suspended || s->corked || force_stop)
461         pa_smoother_pause(s->smoother, x);
462     else if (force_start || s->buffer_attr.prebuf == 0) {
463
464         if (!s->timing_info_valid &&
465             !aposteriori &&
466             !force_start &&
467             !force_stop &&
468             s->context->version >= 13) {
469
470             /* If the server supports STARTED events we take them as
471              * indications when audio really starts/stops playing, if
472              * we don't have any timing info yet -- instead of trying
473              * to be smart and guessing the server time. Otherwise the
474              * unknown transport delay add too much noise to our time
475              * calculations. */
476
477             return;
478         }
479
480         pa_smoother_resume(s->smoother, x, TRUE);
481     }
482
483     /* Please note that we have no idea if playback actually started
484      * if prebuf is non-zero! */
485 }
486
487 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
488
489 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490     pa_context *c = userdata;
491     pa_stream *s;
492     uint32_t channel;
493     const char *dn;
494     pa_bool_t suspended;
495     uint32_t di;
496     pa_usec_t usec = 0;
497     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
498
499     pa_assert(pd);
500     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
501     pa_assert(t);
502     pa_assert(c);
503     pa_assert(PA_REFCNT_VALUE(c) >= 1);
504
505     pa_context_ref(c);
506
507     if (c->version < 12) {
508         pa_context_fail(c, PA_ERR_PROTOCOL);
509         goto finish;
510     }
511
512     if (pa_tagstruct_getu32(t, &channel) < 0 ||
513         pa_tagstruct_getu32(t, &di) < 0 ||
514         pa_tagstruct_gets(t, &dn) < 0 ||
515         pa_tagstruct_get_boolean(t, &suspended) < 0) {
516         pa_context_fail(c, PA_ERR_PROTOCOL);
517         goto finish;
518     }
519
520     if (c->version >= 13) {
521
522         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
523             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
524                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
525                 pa_tagstruct_get_usec(t, &usec) < 0) {
526                 pa_context_fail(c, PA_ERR_PROTOCOL);
527                 goto finish;
528             }
529         } else {
530             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
531                 pa_tagstruct_getu32(t, &tlength) < 0 ||
532                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
533                 pa_tagstruct_getu32(t, &minreq) < 0 ||
534                 pa_tagstruct_get_usec(t, &usec) < 0) {
535                 pa_context_fail(c, PA_ERR_PROTOCOL);
536                 goto finish;
537             }
538         }
539     }
540
541     if (!pa_tagstruct_eof(t)) {
542         pa_context_fail(c, PA_ERR_PROTOCOL);
543         goto finish;
544     }
545
546     if (!dn || di == PA_INVALID_INDEX) {
547         pa_context_fail(c, PA_ERR_PROTOCOL);
548         goto finish;
549     }
550
551     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
552         goto finish;
553
554     if (s->state != PA_STREAM_READY)
555         goto finish;
556
557     if (c->version >= 13) {
558         if (s->direction == PA_STREAM_RECORD)
559             s->timing_info.configured_source_usec = usec;
560         else
561             s->timing_info.configured_sink_usec = usec;
562
563         s->buffer_attr.maxlength = maxlength;
564         s->buffer_attr.fragsize = fragsize;
565         s->buffer_attr.tlength = tlength;
566         s->buffer_attr.prebuf = prebuf;
567         s->buffer_attr.minreq = minreq;
568     }
569
570     pa_xfree(s->device_name);
571     s->device_name = pa_xstrdup(dn);
572     s->device_index = di;
573
574     s->suspended = suspended;
575
576     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
577         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
578         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
579         request_auto_timing_update(s, TRUE);
580     }
581
582     check_smoother_status(s, TRUE, FALSE, FALSE);
583     request_auto_timing_update(s, TRUE);
584
585     if (s->moved_callback)
586         s->moved_callback(s, s->moved_userdata);
587
588 finish:
589     pa_context_unref(c);
590 }
591
592 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
593     pa_context *c = userdata;
594     pa_stream *s;
595     uint32_t channel;
596     pa_usec_t usec = 0;
597     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
598
599     pa_assert(pd);
600     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
601     pa_assert(t);
602     pa_assert(c);
603     pa_assert(PA_REFCNT_VALUE(c) >= 1);
604
605     pa_context_ref(c);
606
607     if (c->version < 15) {
608         pa_context_fail(c, PA_ERR_PROTOCOL);
609         goto finish;
610     }
611
612     if (pa_tagstruct_getu32(t, &channel) < 0) {
613         pa_context_fail(c, PA_ERR_PROTOCOL);
614         goto finish;
615     }
616
617     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
618         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
619             pa_tagstruct_getu32(t, &fragsize) < 0 ||
620             pa_tagstruct_get_usec(t, &usec) < 0) {
621             pa_context_fail(c, PA_ERR_PROTOCOL);
622             goto finish;
623         }
624     } else {
625         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
626             pa_tagstruct_getu32(t, &tlength) < 0 ||
627             pa_tagstruct_getu32(t, &prebuf) < 0 ||
628             pa_tagstruct_getu32(t, &minreq) < 0 ||
629             pa_tagstruct_get_usec(t, &usec) < 0) {
630             pa_context_fail(c, PA_ERR_PROTOCOL);
631             goto finish;
632         }
633     }
634
635     if (!pa_tagstruct_eof(t)) {
636         pa_context_fail(c, PA_ERR_PROTOCOL);
637         goto finish;
638     }
639
640     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
641         goto finish;
642
643     if (s->state != PA_STREAM_READY)
644         goto finish;
645
646     if (s->direction == PA_STREAM_RECORD)
647         s->timing_info.configured_source_usec = usec;
648     else
649         s->timing_info.configured_sink_usec = usec;
650
651     s->buffer_attr.maxlength = maxlength;
652     s->buffer_attr.fragsize = fragsize;
653     s->buffer_attr.tlength = tlength;
654     s->buffer_attr.prebuf = prebuf;
655     s->buffer_attr.minreq = minreq;
656
657     request_auto_timing_update(s, TRUE);
658
659     if (s->buffer_attr_callback)
660         s->buffer_attr_callback(s, s->buffer_attr_userdata);
661
662 finish:
663     pa_context_unref(c);
664 }
665
666 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
667     pa_context *c = userdata;
668     pa_stream *s;
669     uint32_t channel;
670     pa_bool_t suspended;
671
672     pa_assert(pd);
673     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
674     pa_assert(t);
675     pa_assert(c);
676     pa_assert(PA_REFCNT_VALUE(c) >= 1);
677
678     pa_context_ref(c);
679
680     if (c->version < 12) {
681         pa_context_fail(c, PA_ERR_PROTOCOL);
682         goto finish;
683     }
684
685     if (pa_tagstruct_getu32(t, &channel) < 0 ||
686         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
687         !pa_tagstruct_eof(t)) {
688         pa_context_fail(c, PA_ERR_PROTOCOL);
689         goto finish;
690     }
691
692     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
693         goto finish;
694
695     if (s->state != PA_STREAM_READY)
696         goto finish;
697
698     s->suspended = suspended;
699
700     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
701         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
702         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
703         request_auto_timing_update(s, TRUE);
704     }
705
706     check_smoother_status(s, TRUE, FALSE, FALSE);
707     request_auto_timing_update(s, TRUE);
708
709     if (s->suspended_callback)
710         s->suspended_callback(s, s->suspended_userdata);
711
712 finish:
713     pa_context_unref(c);
714 }
715
716 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717     pa_context *c = userdata;
718     pa_stream *s;
719     uint32_t channel;
720
721     pa_assert(pd);
722     pa_assert(command == PA_COMMAND_STARTED);
723     pa_assert(t);
724     pa_assert(c);
725     pa_assert(PA_REFCNT_VALUE(c) >= 1);
726
727     pa_context_ref(c);
728
729     if (c->version < 13) {
730         pa_context_fail(c, PA_ERR_PROTOCOL);
731         goto finish;
732     }
733
734     if (pa_tagstruct_getu32(t, &channel) < 0 ||
735         !pa_tagstruct_eof(t)) {
736         pa_context_fail(c, PA_ERR_PROTOCOL);
737         goto finish;
738     }
739
740     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
741         goto finish;
742
743     if (s->state != PA_STREAM_READY)
744         goto finish;
745
746     check_smoother_status(s, TRUE, TRUE, FALSE);
747     request_auto_timing_update(s, TRUE);
748
749     if (s->started_callback)
750         s->started_callback(s, s->started_userdata);
751
752 finish:
753     pa_context_unref(c);
754 }
755
756 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757     pa_context *c = userdata;
758     pa_stream *s;
759     uint32_t channel;
760     pa_proplist *pl = NULL;
761     const char *event;
762
763     pa_assert(pd);
764     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
765     pa_assert(t);
766     pa_assert(c);
767     pa_assert(PA_REFCNT_VALUE(c) >= 1);
768
769     pa_context_ref(c);
770
771     if (c->version < 15) {
772         pa_context_fail(c, PA_ERR_PROTOCOL);
773         goto finish;
774     }
775
776     pl = pa_proplist_new();
777
778     if (pa_tagstruct_getu32(t, &channel) < 0 ||
779         pa_tagstruct_gets(t, &event) < 0 ||
780         pa_tagstruct_get_proplist(t, pl) < 0 ||
781         !pa_tagstruct_eof(t) || !event) {
782         pa_context_fail(c, PA_ERR_PROTOCOL);
783         goto finish;
784     }
785
786     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
787         goto finish;
788
789     if (s->state != PA_STREAM_READY)
790         goto finish;
791
792     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
793         /* Let client know what the running time was when the stream had to be killed  */
794         pa_usec_t stream_time;
795         if (pa_stream_get_time(s, &stream_time) == 0)
796             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
797     }
798
799     if (s->event_callback)
800         s->event_callback(s, event, pl, s->event_userdata);
801
802 finish:
803     pa_context_unref(c);
804
805     if (pl)
806         pa_proplist_free(pl);
807 }
808
809 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
810     pa_stream *s;
811     pa_context *c = userdata;
812     uint32_t bytes, channel;
813
814     pa_assert(pd);
815     pa_assert(command == PA_COMMAND_REQUEST);
816     pa_assert(t);
817     pa_assert(c);
818     pa_assert(PA_REFCNT_VALUE(c) >= 1);
819
820     pa_context_ref(c);
821
822     if (pa_tagstruct_getu32(t, &channel) < 0 ||
823         pa_tagstruct_getu32(t, &bytes) < 0 ||
824         !pa_tagstruct_eof(t)) {
825         pa_context_fail(c, PA_ERR_PROTOCOL);
826         goto finish;
827     }
828
829     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
830         goto finish;
831
832     if (s->state != PA_STREAM_READY)
833         goto finish;
834
835     s->requested_bytes += bytes;
836
837     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
838
839     if (s->requested_bytes > 0 && s->write_callback)
840         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
841
842 finish:
843     pa_context_unref(c);
844 }
845
846 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
847     pa_stream *s;
848     pa_context *c = userdata;
849     uint32_t channel;
850
851     pa_assert(pd);
852     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
853     pa_assert(t);
854     pa_assert(c);
855     pa_assert(PA_REFCNT_VALUE(c) >= 1);
856
857     pa_context_ref(c);
858
859     if (pa_tagstruct_getu32(t, &channel) < 0 ||
860         !pa_tagstruct_eof(t)) {
861         pa_context_fail(c, PA_ERR_PROTOCOL);
862         goto finish;
863     }
864
865     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
866         goto finish;
867
868     if (s->state != PA_STREAM_READY)
869         goto finish;
870
871     if (s->buffer_attr.prebuf > 0)
872         check_smoother_status(s, TRUE, FALSE, TRUE);
873
874     request_auto_timing_update(s, TRUE);
875
876     if (command == PA_COMMAND_OVERFLOW) {
877         if (s->overflow_callback)
878             s->overflow_callback(s, s->overflow_userdata);
879     } else if (command == PA_COMMAND_UNDERFLOW) {
880         if (s->underflow_callback)
881             s->underflow_callback(s, s->underflow_userdata);
882     }
883
884 finish:
885     pa_context_unref(c);
886 }
887
888 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
889     pa_assert(s);
890     pa_assert(PA_REFCNT_VALUE(s) >= 1);
891
892 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
893
894     if (s->state != PA_STREAM_READY)
895         return;
896
897     if (w) {
898         s->write_index_not_before = s->context->ctag;
899
900         if (s->timing_info_valid)
901             s->timing_info.write_index_corrupt = TRUE;
902
903 /*         pa_log("write_index invalidated"); */
904     }
905
906     if (r) {
907         s->read_index_not_before = s->context->ctag;
908
909         if (s->timing_info_valid)
910             s->timing_info.read_index_corrupt = TRUE;
911
912 /*         pa_log("read_index invalidated"); */
913     }
914
915     request_auto_timing_update(s, TRUE);
916 }
917
918 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
919     pa_stream *s = userdata;
920
921     pa_assert(s);
922     pa_assert(PA_REFCNT_VALUE(s) >= 1);
923
924     pa_stream_ref(s);
925     request_auto_timing_update(s, FALSE);
926     pa_stream_unref(s);
927 }
928
929 static void create_stream_complete(pa_stream *s) {
930     pa_assert(s);
931     pa_assert(PA_REFCNT_VALUE(s) >= 1);
932     pa_assert(s->state == PA_STREAM_CREATING);
933
934     pa_stream_set_state(s, PA_STREAM_READY);
935
936     if (s->requested_bytes > 0 && s->write_callback)
937         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
938
939     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
940         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
941         pa_assert(!s->auto_timing_update_event);
942         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
943
944         request_auto_timing_update(s, TRUE);
945     }
946
947     check_smoother_status(s, TRUE, FALSE, FALSE);
948 }
949
950 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
951     const char *e;
952
953     pa_assert(s);
954     pa_assert(attr);
955
956     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
957         uint32_t ms;
958
959         if (pa_atou(e, &ms) < 0 || ms <= 0)
960             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
961         else {
962             attr->maxlength = (uint32_t) -1;
963             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
964             attr->minreq = (uint32_t) -1;
965             attr->prebuf = (uint32_t) -1;
966             attr->fragsize = attr->tlength;
967         }
968
969         if (flags)
970             *flags |= PA_STREAM_ADJUST_LATENCY;
971     }
972
973     if (s->context->version >= 13)
974         return;
975
976     /* Version older than 0.9.10 didn't do server side buffer_attr
977      * selection, hence we have to fake it on the client side. */
978
979     /* We choose fairly conservative values here, to not confuse
980      * old clients with extremely large playback buffers */
981
982     if (attr->maxlength == (uint32_t) -1)
983         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
984
985     if (attr->tlength == (uint32_t) -1)
986         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
987
988     if (attr->minreq == (uint32_t) -1)
989         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
990
991     if (attr->prebuf == (uint32_t) -1)
992         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
993
994     if (attr->fragsize == (uint32_t) -1)
995         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
996 }
997
998 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
999     pa_stream *s = userdata;
1000     uint32_t requested_bytes = 0;
1001
1002     pa_assert(pd);
1003     pa_assert(s);
1004     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1005     pa_assert(s->state == PA_STREAM_CREATING);
1006
1007     pa_stream_ref(s);
1008
1009     if (command != PA_COMMAND_REPLY) {
1010         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1011             goto finish;
1012
1013         pa_stream_set_state(s, PA_STREAM_FAILED);
1014         goto finish;
1015     }
1016
1017     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1018         s->channel == PA_INVALID_INDEX ||
1019         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1020         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1021         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1022         goto finish;
1023     }
1024
1025     s->requested_bytes = (int64_t) requested_bytes;
1026
1027     if (s->context->version >= 9) {
1028         if (s->direction == PA_STREAM_PLAYBACK) {
1029             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1030                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1031                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1032                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1033                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1034                 goto finish;
1035             }
1036         } else if (s->direction == PA_STREAM_RECORD) {
1037             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1038                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1039                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1040                 goto finish;
1041             }
1042         }
1043     }
1044
1045     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1046         pa_sample_spec ss;
1047         pa_channel_map cm;
1048         const char *dn = NULL;
1049         pa_bool_t suspended;
1050
1051         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1052             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1053             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1054             pa_tagstruct_gets(t, &dn) < 0 ||
1055             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1056             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1057             goto finish;
1058         }
1059
1060         if (!dn || s->device_index == PA_INVALID_INDEX ||
1061             ss.channels != cm.channels ||
1062             !pa_channel_map_valid(&cm) ||
1063             !pa_sample_spec_valid(&ss) ||
1064             (s->n_formats == 0 && (
1065                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1066                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1067                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1068             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1069             goto finish;
1070         }
1071
1072         pa_xfree(s->device_name);
1073         s->device_name = pa_xstrdup(dn);
1074         s->suspended = suspended;
1075
1076         s->channel_map = cm;
1077         s->sample_spec = ss;
1078     }
1079
1080     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1081         pa_usec_t usec;
1082
1083         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1084             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1085             goto finish;
1086         }
1087
1088         if (s->direction == PA_STREAM_RECORD)
1089             s->timing_info.configured_source_usec = usec;
1090         else
1091             s->timing_info.configured_sink_usec = usec;
1092     }
1093
1094     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1095         || s->context->version >= 22) {
1096
1097         pa_format_info *f = pa_format_info_new();
1098         pa_tagstruct_get_format_info(t, f);
1099
1100         if (pa_format_info_valid(f))
1101             s->format = f;
1102         else {
1103             pa_format_info_free(f);
1104             if (s->n_formats > 0) {
1105                 /* We used the extended API, so we should have got back a proper format */
1106                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1107                 goto finish;
1108             }
1109         }
1110     }
1111
1112     if (!pa_tagstruct_eof(t)) {
1113         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1114         goto finish;
1115     }
1116
1117     if (s->direction == PA_STREAM_RECORD) {
1118         pa_assert(!s->record_memblockq);
1119
1120         s->record_memblockq = pa_memblockq_new(
1121                 0,
1122                 s->buffer_attr.maxlength,
1123                 0,
1124                 pa_frame_size(&s->sample_spec),
1125                 1,
1126                 0,
1127                 0,
1128                 NULL);
1129     }
1130
1131     s->channel_valid = TRUE;
1132     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1133
1134     create_stream_complete(s);
1135
1136 finish:
1137     pa_stream_unref(s);
1138 }
1139
1140 static int create_stream(
1141         pa_stream_direction_t direction,
1142         pa_stream *s,
1143         const char *dev,
1144         const pa_buffer_attr *attr,
1145         pa_stream_flags_t flags,
1146         const pa_cvolume *volume,
1147         pa_stream *sync_stream) {
1148
1149     pa_tagstruct *t;
1150     uint32_t tag;
1151     pa_bool_t volume_set = !!volume;
1152     pa_cvolume cv;
1153     uint32_t i;
1154
1155     pa_assert(s);
1156     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1157     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1158
1159     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1160     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1161     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1162     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1163                                               PA_STREAM_INTERPOLATE_TIMING|
1164                                               PA_STREAM_NOT_MONOTONIC|
1165                                               PA_STREAM_AUTO_TIMING_UPDATE|
1166                                               PA_STREAM_NO_REMAP_CHANNELS|
1167                                               PA_STREAM_NO_REMIX_CHANNELS|
1168                                               PA_STREAM_FIX_FORMAT|
1169                                               PA_STREAM_FIX_RATE|
1170                                               PA_STREAM_FIX_CHANNELS|
1171                                               PA_STREAM_DONT_MOVE|
1172                                               PA_STREAM_VARIABLE_RATE|
1173                                               PA_STREAM_PEAK_DETECT|
1174                                               PA_STREAM_START_MUTED|
1175                                               PA_STREAM_ADJUST_LATENCY|
1176                                               PA_STREAM_EARLY_REQUESTS|
1177                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1178                                               PA_STREAM_START_UNMUTED|
1179                                               PA_STREAM_FAIL_ON_SUSPEND|
1180                                               PA_STREAM_RELATIVE_VOLUME|
1181                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1182
1183
1184     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1185     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1186     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1187     /* Althought some of the other flags are not supported on older
1188      * version, we don't check for them here, because it doesn't hurt
1189      * when they are passed but actually not supported. This makes
1190      * client development easier */
1191
1192     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1193     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1194     PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1195     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1196     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1197
1198     pa_stream_ref(s);
1199
1200     s->direction = direction;
1201
1202     if (sync_stream)
1203         s->syncid = sync_stream->syncid;
1204
1205     if (attr)
1206         s->buffer_attr = *attr;
1207     patch_buffer_attr(s, &s->buffer_attr, &flags);
1208
1209     s->flags = flags;
1210     s->corked = !!(flags & PA_STREAM_START_CORKED);
1211
1212     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1213         pa_usec_t x;
1214
1215         x = pa_rtclock_now();
1216
1217         pa_assert(!s->smoother);
1218         s->smoother = pa_smoother_new(
1219                 SMOOTHER_ADJUST_TIME,
1220                 SMOOTHER_HISTORY_TIME,
1221                 !(flags & PA_STREAM_NOT_MONOTONIC),
1222                 TRUE,
1223                 SMOOTHER_MIN_HISTORY,
1224                 x,
1225                 TRUE);
1226     }
1227
1228     if (!dev)
1229         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1230
1231     t = pa_tagstruct_command(
1232             s->context,
1233             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1234             &tag);
1235
1236     if (s->context->version < 13)
1237         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1238
1239     pa_tagstruct_put(
1240             t,
1241             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1242             PA_TAG_CHANNEL_MAP, &s->channel_map,
1243             PA_TAG_U32, PA_INVALID_INDEX,
1244             PA_TAG_STRING, dev,
1245             PA_TAG_U32, s->buffer_attr.maxlength,
1246             PA_TAG_BOOLEAN, s->corked,
1247             PA_TAG_INVALID);
1248
1249     if (!volume) {
1250         if (pa_sample_spec_valid(&s->sample_spec))
1251             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1252         else {
1253             /* This is not really relevant, since no volume was set, and
1254              * the real number of channels is embedded in the format_info
1255              * structure */
1256             volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1257         }
1258     }
1259
1260     if (s->direction == PA_STREAM_PLAYBACK) {
1261         pa_tagstruct_put(
1262                 t,
1263                 PA_TAG_U32, s->buffer_attr.tlength,
1264                 PA_TAG_U32, s->buffer_attr.prebuf,
1265                 PA_TAG_U32, s->buffer_attr.minreq,
1266                 PA_TAG_U32, s->syncid,
1267                 PA_TAG_INVALID);
1268
1269         pa_tagstruct_put_cvolume(t, volume);
1270     } else
1271         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1272
1273     if (s->context->version >= 12) {
1274         pa_tagstruct_put(
1275                 t,
1276                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1277                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1278                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1279                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1280                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1281                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1282                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1283                 PA_TAG_INVALID);
1284     }
1285
1286     if (s->context->version >= 13) {
1287
1288         if (s->direction == PA_STREAM_PLAYBACK)
1289             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1290         else
1291             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1292
1293         pa_tagstruct_put(
1294                 t,
1295                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1296                 PA_TAG_PROPLIST, s->proplist,
1297                 PA_TAG_INVALID);
1298
1299         if (s->direction == PA_STREAM_RECORD)
1300             pa_tagstruct_putu32(t, s->direct_on_input);
1301     }
1302
1303     if (s->context->version >= 14) {
1304
1305         if (s->direction == PA_STREAM_PLAYBACK)
1306             pa_tagstruct_put_boolean(t, volume_set);
1307
1308         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1309     }
1310
1311     if (s->context->version >= 15) {
1312
1313         if (s->direction == PA_STREAM_PLAYBACK)
1314             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1315
1316         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1317         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1318     }
1319
1320     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1321         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1322
1323     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1324         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1325
1326     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1327         || s->context->version >= 22) {
1328
1329         pa_tagstruct_putu8(t, s->n_formats);
1330         for (i = 0; i < s->n_formats; i++)
1331             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1332     }
1333
1334     if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1335         pa_tagstruct_put_cvolume(t, volume);
1336         pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1337         pa_tagstruct_put_boolean(t, volume_set);
1338         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1339         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1340         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1341     }
1342
1343     pa_pstream_send_tagstruct(s->context->pstream, t);
1344     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1345
1346     pa_stream_set_state(s, PA_STREAM_CREATING);
1347
1348     pa_stream_unref(s);
1349     return 0;
1350 }
1351
1352 int pa_stream_connect_playback(
1353         pa_stream *s,
1354         const char *dev,
1355         const pa_buffer_attr *attr,
1356         pa_stream_flags_t flags,
1357         const pa_cvolume *volume,
1358         pa_stream *sync_stream) {
1359
1360     pa_assert(s);
1361     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1362
1363     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1364 }
1365
1366 int pa_stream_connect_record(
1367         pa_stream *s,
1368         const char *dev,
1369         const pa_buffer_attr *attr,
1370         pa_stream_flags_t flags) {
1371
1372     pa_assert(s);
1373     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1374
1375     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1376 }
1377
1378 int pa_stream_begin_write(
1379         pa_stream *s,
1380         void **data,
1381         size_t *nbytes) {
1382
1383     pa_assert(s);
1384     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1385
1386     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1387     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1388     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1389     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1390     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1391
1392     if (*nbytes != (size_t) -1) {
1393         size_t m, fs;
1394
1395         m = pa_mempool_block_size_max(s->context->mempool);
1396         fs = pa_frame_size(&s->sample_spec);
1397
1398         m = (m / fs) * fs;
1399         if (*nbytes > m)
1400             *nbytes = m;
1401     }
1402
1403     if (!s->write_memblock) {
1404         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1405         s->write_data = pa_memblock_acquire(s->write_memblock);
1406     }
1407
1408     *data = s->write_data;
1409     *nbytes = pa_memblock_get_length(s->write_memblock);
1410
1411     return 0;
1412 }
1413
1414 int pa_stream_cancel_write(
1415         pa_stream *s) {
1416
1417     pa_assert(s);
1418     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1419
1420     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1421     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1422     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1423     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1424
1425     pa_assert(s->write_data);
1426
1427     pa_memblock_release(s->write_memblock);
1428     pa_memblock_unref(s->write_memblock);
1429     s->write_memblock = NULL;
1430     s->write_data = NULL;
1431
1432     return 0;
1433 }
1434
1435 int pa_stream_write(
1436         pa_stream *s,
1437         const void *data,
1438         size_t length,
1439         pa_free_cb_t free_cb,
1440         int64_t offset,
1441         pa_seek_mode_t seek) {
1442
1443     pa_assert(s);
1444     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1445     pa_assert(data);
1446
1447     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1448     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1449     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1450     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1451     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1452     PA_CHECK_VALIDITY(s->context,
1453                       !s->write_memblock ||
1454                       ((data >= s->write_data) &&
1455                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1456                       PA_ERR_INVALID);
1457     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1458
1459     if (s->write_memblock) {
1460         pa_memchunk chunk;
1461
1462         /* pa_stream_write_begin() was called before */
1463
1464         pa_memblock_release(s->write_memblock);
1465
1466         chunk.memblock = s->write_memblock;
1467         chunk.index = (const char *) data - (const char *) s->write_data;
1468         chunk.length = length;
1469
1470         s->write_memblock = NULL;
1471         s->write_data = NULL;
1472
1473         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1474         pa_memblock_unref(chunk.memblock);
1475
1476     } else {
1477         pa_seek_mode_t t_seek = seek;
1478         int64_t t_offset = offset;
1479         size_t t_length = length;
1480         const void *t_data = data;
1481
1482         /* pa_stream_write_begin() was not called before */
1483
1484         while (t_length > 0) {
1485             pa_memchunk chunk;
1486
1487             chunk.index = 0;
1488
1489             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1490                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1491                 chunk.length = t_length;
1492             } else {
1493                 void *d;
1494
1495                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1496                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1497
1498                 d = pa_memblock_acquire(chunk.memblock);
1499                 memcpy(d, t_data, chunk.length);
1500                 pa_memblock_release(chunk.memblock);
1501             }
1502
1503             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1504
1505             t_offset = 0;
1506             t_seek = PA_SEEK_RELATIVE;
1507
1508             t_data = (const uint8_t*) t_data + chunk.length;
1509             t_length -= chunk.length;
1510
1511             pa_memblock_unref(chunk.memblock);
1512         }
1513
1514         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1515             free_cb((void*) data);
1516     }
1517
1518     /* This is obviously wrong since we ignore the seeking index . But
1519      * that's OK, the server side applies the same error */
1520     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1521
1522     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1523
1524     if (s->direction == PA_STREAM_PLAYBACK) {
1525
1526         /* Update latency request correction */
1527         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1528
1529             if (seek == PA_SEEK_ABSOLUTE) {
1530                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1531                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1532                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1533             } else if (seek == PA_SEEK_RELATIVE) {
1534                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1535                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1536             } else
1537                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1538         }
1539
1540         /* Update the write index in the already available latency data */
1541         if (s->timing_info_valid) {
1542
1543             if (seek == PA_SEEK_ABSOLUTE) {
1544                 s->timing_info.write_index_corrupt = FALSE;
1545                 s->timing_info.write_index = offset + (int64_t) length;
1546             } else if (seek == PA_SEEK_RELATIVE) {
1547                 if (!s->timing_info.write_index_corrupt)
1548                     s->timing_info.write_index += offset + (int64_t) length;
1549             } else
1550                 s->timing_info.write_index_corrupt = TRUE;
1551         }
1552
1553         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1554             request_auto_timing_update(s, TRUE);
1555     }
1556
1557     return 0;
1558 }
1559
1560 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1561     pa_assert(s);
1562     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1563     pa_assert(data);
1564     pa_assert(length);
1565
1566     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1567     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1568     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1569
1570     if (!s->peek_memchunk.memblock) {
1571
1572         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1573             *data = NULL;
1574             *length = 0;
1575             return 0;
1576         }
1577
1578         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1579     }
1580
1581     pa_assert(s->peek_data);
1582     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1583     *length = s->peek_memchunk.length;
1584     return 0;
1585 }
1586
1587 int pa_stream_drop(pa_stream *s) {
1588     pa_assert(s);
1589     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1590
1591     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1592     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1593     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1594     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1595
1596     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1597
1598     /* Fix the simulated local read index */
1599     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1600         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1601
1602     pa_assert(s->peek_data);
1603     pa_memblock_release(s->peek_memchunk.memblock);
1604     pa_memblock_unref(s->peek_memchunk.memblock);
1605     pa_memchunk_reset(&s->peek_memchunk);
1606
1607     return 0;
1608 }
1609
1610 size_t pa_stream_writable_size(pa_stream *s) {
1611     pa_assert(s);
1612     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1613
1614     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1615     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1616     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1617
1618     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1619 }
1620
1621 size_t pa_stream_readable_size(pa_stream *s) {
1622     pa_assert(s);
1623     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1624
1625     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1626     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1627     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1628
1629     return pa_memblockq_get_length(s->record_memblockq);
1630 }
1631
1632 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1633     pa_operation *o;
1634     pa_tagstruct *t;
1635     uint32_t tag;
1636
1637     pa_assert(s);
1638     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1639
1640     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1641     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1642     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1643
1644     /* Ask for a timing update before we cork/uncork to get the best
1645      * accuracy for the transport latency suitable for the
1646      * check_smoother_status() call in the started callback */
1647     request_auto_timing_update(s, TRUE);
1648
1649     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1650
1651     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1652     pa_tagstruct_putu32(t, s->channel);
1653     pa_pstream_send_tagstruct(s->context->pstream, t);
1654     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1655
1656     /* This might cause the read index to continue again, hence
1657      * let's request a timing update */
1658     request_auto_timing_update(s, TRUE);
1659
1660     return o;
1661 }
1662
1663 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1664     pa_usec_t usec;
1665
1666     pa_assert(s);
1667     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1668     pa_assert(s->state == PA_STREAM_READY);
1669     pa_assert(s->direction != PA_STREAM_UPLOAD);
1670     pa_assert(s->timing_info_valid);
1671     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1672     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1673
1674     if (s->direction == PA_STREAM_PLAYBACK) {
1675         /* The last byte that was written into the output device
1676          * had this time value associated */
1677         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1678
1679         if (!s->corked && !s->suspended) {
1680
1681             if (!ignore_transport)
1682                 /* Because the latency info took a little time to come
1683                  * to us, we assume that the real output time is actually
1684                  * a little ahead */
1685                 usec += s->timing_info.transport_usec;
1686
1687             /* However, the output device usually maintains a buffer
1688                too, hence the real sample currently played is a little
1689                back  */
1690             if (s->timing_info.sink_usec >= usec)
1691                 usec = 0;
1692             else
1693                 usec -= s->timing_info.sink_usec;
1694         }
1695
1696     } else {
1697         pa_assert(s->direction == PA_STREAM_RECORD);
1698
1699         /* The last byte written into the server side queue had
1700          * this time value associated */
1701         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1702
1703         if (!s->corked && !s->suspended) {
1704
1705             if (!ignore_transport)
1706                 /* Add transport latency */
1707                 usec += s->timing_info.transport_usec;
1708
1709             /* Add latency of data in device buffer */
1710             usec += s->timing_info.source_usec;
1711
1712             /* If this is a monitor source, we need to correct the
1713              * time by the playback device buffer */
1714             if (s->timing_info.sink_usec >= usec)
1715                 usec = 0;
1716             else
1717                 usec -= s->timing_info.sink_usec;
1718         }
1719     }
1720
1721     return usec;
1722 }
1723
1724 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1725     pa_operation *o = userdata;
1726     struct timeval local, remote, now;
1727     pa_timing_info *i;
1728     pa_bool_t playing = FALSE;
1729     uint64_t underrun_for = 0, playing_for = 0;
1730
1731     pa_assert(pd);
1732     pa_assert(o);
1733     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1734
1735     if (!o->context || !o->stream)
1736         goto finish;
1737
1738     i = &o->stream->timing_info;
1739
1740     o->stream->timing_info_valid = FALSE;
1741     i->write_index_corrupt = TRUE;
1742     i->read_index_corrupt = TRUE;
1743
1744     if (command != PA_COMMAND_REPLY) {
1745         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1746             goto finish;
1747
1748     } else {
1749
1750         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1751             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1752             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1753             pa_tagstruct_get_timeval(t, &local) < 0 ||
1754             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1755             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1756             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1757
1758             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1759             goto finish;
1760         }
1761
1762         if (o->context->version >= 13 &&
1763             o->stream->direction == PA_STREAM_PLAYBACK)
1764             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1765                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1766
1767                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1768                 goto finish;
1769             }
1770
1771
1772         if (!pa_tagstruct_eof(t)) {
1773             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1774             goto finish;
1775         }
1776         o->stream->timing_info_valid = TRUE;
1777         i->write_index_corrupt = FALSE;
1778         i->read_index_corrupt = FALSE;
1779
1780         i->playing = (int) playing;
1781         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1782
1783         pa_gettimeofday(&now);
1784
1785         /* Calculcate timestamps */
1786         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1787             /* local and remote seem to have synchronized clocks */
1788
1789             if (o->stream->direction == PA_STREAM_PLAYBACK)
1790                 i->transport_usec = pa_timeval_diff(&remote, &local);
1791             else
1792                 i->transport_usec = pa_timeval_diff(&now, &remote);
1793
1794             i->synchronized_clocks = TRUE;
1795             i->timestamp = remote;
1796         } else {
1797             /* clocks are not synchronized, let's estimate latency then */
1798             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1799             i->synchronized_clocks = FALSE;
1800             i->timestamp = local;
1801             pa_timeval_add(&i->timestamp, i->transport_usec);
1802         }
1803
1804         /* Invalidate read and write indexes if necessary */
1805         if (tag < o->stream->read_index_not_before)
1806             i->read_index_corrupt = TRUE;
1807
1808         if (tag < o->stream->write_index_not_before)
1809             i->write_index_corrupt = TRUE;
1810
1811         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1812             /* Write index correction */
1813
1814             int n, j;
1815             uint32_t ctag = tag;
1816
1817             /* Go through the saved correction values and add up the
1818              * total correction.*/
1819             for (n = 0, j = o->stream->current_write_index_correction+1;
1820                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1821                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1822
1823                 /* Step over invalid data or out-of-date data */
1824                 if (!o->stream->write_index_corrections[j].valid ||
1825                     o->stream->write_index_corrections[j].tag < ctag)
1826                     continue;
1827
1828                 /* Make sure that everything is in order */
1829                 ctag = o->stream->write_index_corrections[j].tag+1;
1830
1831                 /* Now fix the write index */
1832                 if (o->stream->write_index_corrections[j].corrupt) {
1833                     /* A corrupting seek was made */
1834                     i->write_index_corrupt = TRUE;
1835                 } else if (o->stream->write_index_corrections[j].absolute) {
1836                     /* An absolute seek was made */
1837                     i->write_index = o->stream->write_index_corrections[j].value;
1838                     i->write_index_corrupt = FALSE;
1839                 } else if (!i->write_index_corrupt) {
1840                     /* A relative seek was made */
1841                     i->write_index += o->stream->write_index_corrections[j].value;
1842                 }
1843             }
1844
1845             /* Clear old correction entries */
1846             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1847                 if (!o->stream->write_index_corrections[n].valid)
1848                     continue;
1849
1850                 if (o->stream->write_index_corrections[n].tag <= tag)
1851                     o->stream->write_index_corrections[n].valid = FALSE;
1852             }
1853         }
1854
1855         if (o->stream->direction == PA_STREAM_RECORD) {
1856             /* Read index correction */
1857
1858             if (!i->read_index_corrupt)
1859                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1860         }
1861
1862         /* Update smoother if we're not corked */
1863         if (o->stream->smoother && !o->stream->corked) {
1864             pa_usec_t u, x;
1865
1866             u = x = pa_rtclock_now() - i->transport_usec;
1867
1868             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1869                 pa_usec_t su;
1870
1871                 /* If we weren't playing then it will take some time
1872                  * until the audio will actually come out through the
1873                  * speakers. Since we follow that timing here, we need
1874                  * to try to fix this up */
1875
1876                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1877
1878                 if (su < i->sink_usec)
1879                     x += i->sink_usec - su;
1880             }
1881
1882             if (!i->playing)
1883                 pa_smoother_pause(o->stream->smoother, x);
1884
1885             /* Update the smoother */
1886             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1887                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1888                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1889
1890             if (i->playing)
1891                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1892         }
1893     }
1894
1895     o->stream->auto_timing_update_requested = FALSE;
1896
1897     if (o->stream->latency_update_callback)
1898         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1899
1900     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1901         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1902         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1903     }
1904
1905 finish:
1906
1907     pa_operation_done(o);
1908     pa_operation_unref(o);
1909 }
1910
1911 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1912     uint32_t tag;
1913     pa_operation *o;
1914     pa_tagstruct *t;
1915     struct timeval now;
1916     int cidx = 0;
1917
1918     pa_assert(s);
1919     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1920
1921     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1922     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1923     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1924
1925     if (s->direction == PA_STREAM_PLAYBACK) {
1926         /* Find a place to store the write_index correction data for this entry */
1927         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1928
1929         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1930         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1931     }
1932     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1933
1934     t = pa_tagstruct_command(
1935             s->context,
1936             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1937             &tag);
1938     pa_tagstruct_putu32(t, s->channel);
1939     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1940
1941     pa_pstream_send_tagstruct(s->context->pstream, t);
1942     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1943
1944     if (s->direction == PA_STREAM_PLAYBACK) {
1945         /* Fill in initial correction data */
1946
1947         s->current_write_index_correction = cidx;
1948
1949         s->write_index_corrections[cidx].valid = TRUE;
1950         s->write_index_corrections[cidx].absolute = FALSE;
1951         s->write_index_corrections[cidx].corrupt = FALSE;
1952         s->write_index_corrections[cidx].tag = tag;
1953         s->write_index_corrections[cidx].value = 0;
1954     }
1955
1956     return o;
1957 }
1958
1959 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1960     pa_stream *s = userdata;
1961
1962     pa_assert(pd);
1963     pa_assert(s);
1964     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1965
1966     pa_stream_ref(s);
1967
1968     if (command != PA_COMMAND_REPLY) {
1969         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1970             goto finish;
1971
1972         pa_stream_set_state(s, PA_STREAM_FAILED);
1973         goto finish;
1974     } else if (!pa_tagstruct_eof(t)) {
1975         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1976         goto finish;
1977     }
1978
1979     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1980
1981 finish:
1982     pa_stream_unref(s);
1983 }
1984
1985 int pa_stream_disconnect(pa_stream *s) {
1986     pa_tagstruct *t;
1987     uint32_t tag;
1988
1989     pa_assert(s);
1990     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1991
1992     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1993     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1994     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1995
1996     pa_stream_ref(s);
1997
1998     t = pa_tagstruct_command(
1999             s->context,
2000             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2001                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2002             &tag);
2003     pa_tagstruct_putu32(t, s->channel);
2004     pa_pstream_send_tagstruct(s->context->pstream, t);
2005     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2006
2007     pa_stream_unref(s);
2008     return 0;
2009 }
2010
2011 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2012     pa_assert(s);
2013     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2014
2015     if (pa_detect_fork())
2016         return;
2017
2018     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2019         return;
2020
2021     s->read_callback = cb;
2022     s->read_userdata = userdata;
2023 }
2024
2025 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2026     pa_assert(s);
2027     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2028
2029     if (pa_detect_fork())
2030         return;
2031
2032     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2033         return;
2034
2035     s->write_callback = cb;
2036     s->write_userdata = userdata;
2037 }
2038
2039 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2040     pa_assert(s);
2041     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2042
2043     if (pa_detect_fork())
2044         return;
2045
2046     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2047         return;
2048
2049     s->state_callback = cb;
2050     s->state_userdata = userdata;
2051 }
2052
2053 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2054     pa_assert(s);
2055     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2056
2057     if (pa_detect_fork())
2058         return;
2059
2060     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2061         return;
2062
2063     s->overflow_callback = cb;
2064     s->overflow_userdata = userdata;
2065 }
2066
2067 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2068     pa_assert(s);
2069     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2070
2071     if (pa_detect_fork())
2072         return;
2073
2074     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2075         return;
2076
2077     s->underflow_callback = cb;
2078     s->underflow_userdata = userdata;
2079 }
2080
2081 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2082     pa_assert(s);
2083     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2084
2085     if (pa_detect_fork())
2086         return;
2087
2088     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2089         return;
2090
2091     s->latency_update_callback = cb;
2092     s->latency_update_userdata = userdata;
2093 }
2094
2095 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2096     pa_assert(s);
2097     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2098
2099     if (pa_detect_fork())
2100         return;
2101
2102     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2103         return;
2104
2105     s->moved_callback = cb;
2106     s->moved_userdata = userdata;
2107 }
2108
2109 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2110     pa_assert(s);
2111     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2112
2113     if (pa_detect_fork())
2114         return;
2115
2116     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2117         return;
2118
2119     s->suspended_callback = cb;
2120     s->suspended_userdata = userdata;
2121 }
2122
2123 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2124     pa_assert(s);
2125     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2126
2127     if (pa_detect_fork())
2128         return;
2129
2130     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2131         return;
2132
2133     s->started_callback = cb;
2134     s->started_userdata = userdata;
2135 }
2136
2137 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2138     pa_assert(s);
2139     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140
2141     if (pa_detect_fork())
2142         return;
2143
2144     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2145         return;
2146
2147     s->event_callback = cb;
2148     s->event_userdata = userdata;
2149 }
2150
2151 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2152     pa_assert(s);
2153     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2154
2155     if (pa_detect_fork())
2156         return;
2157
2158     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2159         return;
2160
2161     s->buffer_attr_callback = cb;
2162     s->buffer_attr_userdata = userdata;
2163 }
2164
2165 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2166     pa_operation *o = userdata;
2167     int success = 1;
2168
2169     pa_assert(pd);
2170     pa_assert(o);
2171     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2172
2173     if (!o->context)
2174         goto finish;
2175
2176     if (command != PA_COMMAND_REPLY) {
2177         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2178             goto finish;
2179
2180         success = 0;
2181     } else if (!pa_tagstruct_eof(t)) {
2182         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2183         goto finish;
2184     }
2185
2186     if (o->callback) {
2187         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2188         cb(o->stream, success, o->userdata);
2189     }
2190
2191 finish:
2192     pa_operation_done(o);
2193     pa_operation_unref(o);
2194 }
2195
2196 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2197     pa_operation *o;
2198     pa_tagstruct *t;
2199     uint32_t tag;
2200
2201     pa_assert(s);
2202     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2203
2204     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2205     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2206     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2207
2208     /* Ask for a timing update before we cork/uncork to get the best
2209      * accuracy for the transport latency suitable for the
2210      * check_smoother_status() call in the started callback */
2211     request_auto_timing_update(s, TRUE);
2212
2213     s->corked = b;
2214
2215     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2216
2217     t = pa_tagstruct_command(
2218             s->context,
2219             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2220             &tag);
2221     pa_tagstruct_putu32(t, s->channel);
2222     pa_tagstruct_put_boolean(t, !!b);
2223     pa_pstream_send_tagstruct(s->context->pstream, t);
2224     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2225
2226     check_smoother_status(s, FALSE, FALSE, FALSE);
2227
2228     /* This might cause the indexes to hang/start again, hence let's
2229      * request a timing update, after the cork/uncork, too */
2230     request_auto_timing_update(s, TRUE);
2231
2232     return o;
2233 }
2234
2235 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2236     pa_tagstruct *t;
2237     pa_operation *o;
2238     uint32_t tag;
2239
2240     pa_assert(s);
2241     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2242
2243     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2244     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2245
2246     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2247
2248     t = pa_tagstruct_command(s->context, command, &tag);
2249     pa_tagstruct_putu32(t, s->channel);
2250     pa_pstream_send_tagstruct(s->context->pstream, t);
2251     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2252
2253     return o;
2254 }
2255
2256 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2257     pa_operation *o;
2258
2259     pa_assert(s);
2260     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2261
2262     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2263     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2264     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2265
2266     /* Ask for a timing update *before* the flush, so that the
2267      * transport usec is as up to date as possible when we get the
2268      * underflow message and update the smoother status*/
2269     request_auto_timing_update(s, TRUE);
2270
2271     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2272         return NULL;
2273
2274     if (s->direction == PA_STREAM_PLAYBACK) {
2275
2276         if (s->write_index_corrections[s->current_write_index_correction].valid)
2277             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2278
2279         if (s->buffer_attr.prebuf > 0)
2280             check_smoother_status(s, FALSE, FALSE, TRUE);
2281
2282         /* This will change the write index, but leave the
2283          * read index untouched. */
2284         invalidate_indexes(s, FALSE, TRUE);
2285
2286     } else
2287         /* For record streams this has no influence on the write
2288          * index, but the read index might jump. */
2289         invalidate_indexes(s, TRUE, FALSE);
2290
2291     /* Note that we do not update requested_bytes here. This is
2292      * because we cannot really know how data actually was dropped
2293      * from the write index due to this. This 'error' will be applied
2294      * by both client and server and hence we should be fine. */
2295
2296     return o;
2297 }
2298
2299 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2300     pa_operation *o;
2301
2302     pa_assert(s);
2303     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2304
2305     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2306     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2307     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2308     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2309
2310     /* Ask for a timing update before we cork/uncork to get the best
2311      * accuracy for the transport latency suitable for the
2312      * check_smoother_status() call in the started callback */
2313     request_auto_timing_update(s, TRUE);
2314
2315     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2316         return NULL;
2317
2318     /* This might cause the read index to hang again, hence
2319      * let's request a timing update */
2320     request_auto_timing_update(s, TRUE);
2321
2322     return o;
2323 }
2324
2325 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2326     pa_operation *o;
2327
2328     pa_assert(s);
2329     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2330
2331     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2332     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2333     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2334     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2335
2336     /* Ask for a timing update before we cork/uncork to get the best
2337      * accuracy for the transport latency suitable for the
2338      * check_smoother_status() call in the started callback */
2339     request_auto_timing_update(s, TRUE);
2340
2341     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2342         return NULL;
2343
2344     /* This might cause the read index to start moving again, hence
2345      * let's request a timing update */
2346     request_auto_timing_update(s, TRUE);
2347
2348     return o;
2349 }
2350
2351 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2352     pa_operation *o;
2353
2354     pa_assert(s);
2355     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2356     pa_assert(name);
2357
2358     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2359     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2360     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2361
2362     if (s->context->version >= 13) {
2363         pa_proplist *p = pa_proplist_new();
2364
2365         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2366         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2367         pa_proplist_free(p);
2368     } else {
2369         pa_tagstruct *t;
2370         uint32_t tag;
2371
2372         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2373         t = pa_tagstruct_command(
2374                 s->context,
2375                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2376                 &tag);
2377         pa_tagstruct_putu32(t, s->channel);
2378         pa_tagstruct_puts(t, name);
2379         pa_pstream_send_tagstruct(s->context->pstream, t);
2380         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2381     }
2382
2383     return o;
2384 }
2385
2386 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2387     pa_usec_t usec;
2388
2389     pa_assert(s);
2390     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2391
2392     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2393     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2394     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2395     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2396     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2397     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2398
2399     if (s->smoother)
2400         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2401     else
2402         usec = calc_time(s, FALSE);
2403
2404     /* Make sure the time runs monotonically */
2405     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2406         if (usec < s->previous_time)
2407             usec = s->previous_time;
2408         else
2409             s->previous_time = usec;
2410     }
2411
2412     if (r_usec)
2413         *r_usec = usec;
2414
2415     return 0;
2416 }
2417
2418 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2419     pa_assert(s);
2420     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2421
2422     if (negative)
2423         *negative = 0;
2424
2425     if (a >= b)
2426         return a-b;
2427     else {
2428         if (negative && s->direction == PA_STREAM_RECORD) {
2429             *negative = 1;
2430             return b-a;
2431         } else
2432             return 0;
2433     }
2434 }
2435
2436 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2437     pa_usec_t t, c;
2438     int r;
2439     int64_t cindex;
2440
2441     pa_assert(s);
2442     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2443     pa_assert(r_usec);
2444
2445     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2446     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2447     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2448     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2449     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2450     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2451
2452     if ((r = pa_stream_get_time(s, &t)) < 0)
2453         return r;
2454
2455     if (s->direction == PA_STREAM_PLAYBACK)
2456         cindex = s->timing_info.write_index;
2457     else
2458         cindex = s->timing_info.read_index;
2459
2460     if (cindex < 0)
2461         cindex = 0;
2462
2463     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2464
2465     if (s->direction == PA_STREAM_PLAYBACK)
2466         *r_usec = time_counter_diff(s, c, t, negative);
2467     else
2468         *r_usec = time_counter_diff(s, t, c, negative);
2469
2470     return 0;
2471 }
2472
2473 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2474     pa_assert(s);
2475     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2476
2477     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2478     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2479     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2480     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2481
2482     return &s->timing_info;
2483 }
2484
2485 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2486     pa_assert(s);
2487     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2488
2489     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2490
2491     return &s->sample_spec;
2492 }
2493
2494 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2495     pa_assert(s);
2496     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2497
2498     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2499
2500     return &s->channel_map;
2501 }
2502
2503 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2504     pa_assert(s);
2505     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2506
2507     /* We don't have the format till routing is done */
2508     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2509     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2510
2511     return s->format;
2512 }
2513 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2514     pa_assert(s);
2515     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2516
2517     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2518     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2519     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2520
2521     return &s->buffer_attr;
2522 }
2523
2524 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2525     pa_operation *o = userdata;
2526     int success = 1;
2527
2528     pa_assert(pd);
2529     pa_assert(o);
2530     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2531
2532     if (!o->context)
2533         goto finish;
2534
2535     if (command != PA_COMMAND_REPLY) {
2536         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2537             goto finish;
2538
2539         success = 0;
2540     } else {
2541         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2542             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2543                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2544                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2545                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2546                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2547                 goto finish;
2548             }
2549         } else if (o->stream->direction == PA_STREAM_RECORD) {
2550             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2551                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2552                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2553                 goto finish;
2554             }
2555         }
2556
2557         if (o->stream->context->version >= 13) {
2558             pa_usec_t usec;
2559
2560             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2561                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2562                 goto finish;
2563             }
2564
2565             if (o->stream->direction == PA_STREAM_RECORD)
2566                 o->stream->timing_info.configured_source_usec = usec;
2567             else
2568                 o->stream->timing_info.configured_sink_usec = usec;
2569         }
2570
2571         if (!pa_tagstruct_eof(t)) {
2572             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2573             goto finish;
2574         }
2575     }
2576
2577     if (o->callback) {
2578         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2579         cb(o->stream, success, o->userdata);
2580     }
2581
2582 finish:
2583     pa_operation_done(o);
2584     pa_operation_unref(o);
2585 }
2586
2587
2588 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2589     pa_operation *o;
2590     pa_tagstruct *t;
2591     uint32_t tag;
2592     pa_buffer_attr copy;
2593
2594     pa_assert(s);
2595     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2596     pa_assert(attr);
2597
2598     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2599     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2600     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2601     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2602
2603     /* Ask for a timing update before we cork/uncork to get the best
2604      * accuracy for the transport latency suitable for the
2605      * check_smoother_status() call in the started callback */
2606     request_auto_timing_update(s, TRUE);
2607
2608     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2609
2610     t = pa_tagstruct_command(
2611             s->context,
2612             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2613             &tag);
2614     pa_tagstruct_putu32(t, s->channel);
2615
2616     copy = *attr;
2617     patch_buffer_attr(s, &copy, NULL);
2618     attr = &copy;
2619
2620     pa_tagstruct_putu32(t, attr->maxlength);
2621
2622     if (s->direction == PA_STREAM_PLAYBACK)
2623         pa_tagstruct_put(
2624                 t,
2625                 PA_TAG_U32, attr->tlength,
2626                 PA_TAG_U32, attr->prebuf,
2627                 PA_TAG_U32, attr->minreq,
2628                 PA_TAG_INVALID);
2629     else
2630         pa_tagstruct_putu32(t, attr->fragsize);
2631
2632     if (s->context->version >= 13)
2633         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2634
2635     if (s->context->version >= 14)
2636         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2637
2638     pa_pstream_send_tagstruct(s->context->pstream, t);
2639     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2640
2641     /* This might cause changes in the read/write indexex, hence let's
2642      * request a timing update */
2643     request_auto_timing_update(s, TRUE);
2644
2645     return o;
2646 }
2647
2648 uint32_t pa_stream_get_device_index(pa_stream *s) {
2649     pa_assert(s);
2650     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2651
2652     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2653     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2654     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2655     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2656     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2657
2658     return s->device_index;
2659 }
2660
2661 const char *pa_stream_get_device_name(pa_stream *s) {
2662     pa_assert(s);
2663     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2664
2665     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2666     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2667     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2668     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2669     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2670
2671     return s->device_name;
2672 }
2673
2674 int pa_stream_is_suspended(pa_stream *s) {
2675     pa_assert(s);
2676     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2677
2678     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2679     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2680     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2681     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2682
2683     return s->suspended;
2684 }
2685
2686 int pa_stream_is_corked(pa_stream *s) {
2687     pa_assert(s);
2688     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2689
2690     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2691     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2692     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2693
2694     return s->corked;
2695 }
2696
2697 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2698     pa_operation *o = userdata;
2699     int success = 1;
2700
2701     pa_assert(pd);
2702     pa_assert(o);
2703     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2704
2705     if (!o->context)
2706         goto finish;
2707
2708     if (command != PA_COMMAND_REPLY) {
2709         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2710             goto finish;
2711
2712         success = 0;
2713     } else {
2714
2715         if (!pa_tagstruct_eof(t)) {
2716             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2717             goto finish;
2718         }
2719     }
2720
2721     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2722     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2723
2724     if (o->callback) {
2725         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2726         cb(o->stream, success, o->userdata);
2727     }
2728
2729 finish:
2730     pa_operation_done(o);
2731     pa_operation_unref(o);
2732 }
2733
2734
2735 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2736     pa_operation *o;
2737     pa_tagstruct *t;
2738     uint32_t tag;
2739
2740     pa_assert(s);
2741     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2742
2743     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2744     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2745     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2746     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2747     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2748     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2749
2750     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2751     o->private = PA_UINT_TO_PTR(rate);
2752
2753     t = pa_tagstruct_command(
2754             s->context,
2755             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2756             &tag);
2757     pa_tagstruct_putu32(t, s->channel);
2758     pa_tagstruct_putu32(t, rate);
2759
2760     pa_pstream_send_tagstruct(s->context->pstream, t);
2761     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2762
2763     return o;
2764 }
2765
2766 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2767     pa_operation *o;
2768     pa_tagstruct *t;
2769     uint32_t tag;
2770
2771     pa_assert(s);
2772     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2773
2774     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2775     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2776     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2777     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2778     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2779
2780     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2781
2782     t = pa_tagstruct_command(
2783             s->context,
2784             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2785             &tag);
2786     pa_tagstruct_putu32(t, s->channel);
2787     pa_tagstruct_putu32(t, (uint32_t) mode);
2788     pa_tagstruct_put_proplist(t, p);
2789
2790     pa_pstream_send_tagstruct(s->context->pstream, t);
2791     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2792
2793     /* Please note that we don't update s->proplist here, because we
2794      * don't export that field */
2795
2796     return o;
2797 }
2798
2799 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2800     pa_operation *o;
2801     pa_tagstruct *t;
2802     uint32_t tag;
2803     const char * const*k;
2804
2805     pa_assert(s);
2806     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2807
2808     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2809     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2810     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2811     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2812     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2813
2814     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2815
2816     t = pa_tagstruct_command(
2817             s->context,
2818             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2819             &tag);
2820     pa_tagstruct_putu32(t, s->channel);
2821
2822     for (k = keys; *k; k++)
2823         pa_tagstruct_puts(t, *k);
2824
2825     pa_tagstruct_puts(t, NULL);
2826
2827     pa_pstream_send_tagstruct(s->context->pstream, t);
2828     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2829
2830     /* Please note that we don't update s->proplist here, because we
2831      * don't export that field */
2832
2833     return o;
2834 }
2835
2836 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2837     pa_assert(s);
2838     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2839
2840     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2841     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2842     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2843     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2844
2845     s->direct_on_input = sink_input_idx;
2846
2847     return 0;
2848 }
2849
2850 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2851     pa_assert(s);
2852     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2853
2854     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2855     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2856     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2857
2858     return s->direct_on_input;
2859 }