Merge branch 'master' of ssh://rootserver/home/lennart/git/public/pulseaudio
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "internal.h"
42
43 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
44
45 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
46 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_MIN_HISTORY (4)
48
49 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
50     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
51 }
52
53 static void reset_callbacks(pa_stream *s) {
54     s->read_callback = NULL;
55     s->read_userdata = NULL;
56     s->write_callback = NULL;
57     s->write_userdata = NULL;
58     s->state_callback = NULL;
59     s->state_userdata = NULL;
60     s->overflow_callback = NULL;
61     s->overflow_userdata = NULL;
62     s->underflow_callback = NULL;
63     s->underflow_userdata = NULL;
64     s->latency_update_callback = NULL;
65     s->latency_update_userdata = NULL;
66     s->moved_callback = NULL;
67     s->moved_userdata = NULL;
68     s->suspended_callback = NULL;
69     s->suspended_userdata = NULL;
70     s->started_callback = NULL;
71     s->started_userdata = NULL;
72     s->event_callback = NULL;
73     s->event_userdata = NULL;
74 }
75
76 pa_stream *pa_stream_new_with_proplist(
77         pa_context *c,
78         const char *name,
79         const pa_sample_spec *ss,
80         const pa_channel_map *map,
81         pa_proplist *p) {
82
83     pa_stream *s;
84     int i;
85     pa_channel_map tmap;
86
87     pa_assert(c);
88     pa_assert(PA_REFCNT_VALUE(c) >= 1);
89
90     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
91     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
92     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
93     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
94     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
95     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
96
97     if (!map)
98         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
99
100     s = pa_xnew(pa_stream, 1);
101     PA_REFCNT_INIT(s);
102     s->context = c;
103     s->mainloop = c->mainloop;
104
105     s->direction = PA_STREAM_NODIRECTION;
106     s->state = PA_STREAM_UNCONNECTED;
107     s->flags = 0;
108
109     s->sample_spec = *ss;
110     s->channel_map = *map;
111
112     s->direct_on_input = PA_INVALID_INDEX;
113
114     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
115     if (name)
116         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
117
118     s->channel = 0;
119     s->channel_valid = FALSE;
120     s->syncid = c->csyncid++;
121     s->stream_index = PA_INVALID_INDEX;
122
123     s->requested_bytes = 0;
124     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
125
126     /* We initialize der target length here, so that if the user
127      * passes no explicit buffering metrics the default is similar to
128      * what older PA versions provided. */
129
130     s->buffer_attr.maxlength = (uint32_t) -1;
131     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
132     s->buffer_attr.minreq = (uint32_t) -1;
133     s->buffer_attr.prebuf = (uint32_t) -1;
134     s->buffer_attr.fragsize = (uint32_t) -1;
135
136     s->device_index = PA_INVALID_INDEX;
137     s->device_name = NULL;
138     s->suspended = FALSE;
139
140     pa_memchunk_reset(&s->peek_memchunk);
141     s->peek_data = NULL;
142
143     s->record_memblockq = NULL;
144
145     s->corked = FALSE;
146
147     memset(&s->timing_info, 0, sizeof(s->timing_info));
148     s->timing_info_valid = FALSE;
149
150     s->previous_time = 0;
151
152     s->read_index_not_before = 0;
153     s->write_index_not_before = 0;
154     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
155         s->write_index_corrections[i].valid = 0;
156     s->current_write_index_correction = 0;
157
158     s->auto_timing_update_event = NULL;
159     s->auto_timing_update_requested = FALSE;
160
161     reset_callbacks(s);
162
163     s->smoother = NULL;
164
165     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
166     PA_LLIST_PREPEND(pa_stream, c->streams, s);
167     pa_stream_ref(s);
168
169     return s;
170 }
171
172 static void stream_unlink(pa_stream *s) {
173     pa_operation *o, *n;
174     pa_assert(s);
175
176     if (!s->context)
177         return;
178
179     /* Detach from context */
180
181     /* Unref all operatio object that point to us */
182     for (o = s->context->operations; o; o = n) {
183         n = o->next;
184
185         if (o->stream == s)
186             pa_operation_cancel(o);
187     }
188
189     /* Drop all outstanding replies for this stream */
190     if (s->context->pdispatch)
191         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
192
193     if (s->channel_valid) {
194         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
195         s->channel = 0;
196         s->channel_valid = FALSE;
197     }
198
199     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
200     pa_stream_unref(s);
201
202     s->context = NULL;
203
204     if (s->auto_timing_update_event) {
205         pa_assert(s->mainloop);
206         s->mainloop->time_free(s->auto_timing_update_event);
207     }
208
209     reset_callbacks(s);
210 }
211
212 static void stream_free(pa_stream *s) {
213     pa_assert(s);
214
215     stream_unlink(s);
216
217     if (s->peek_memchunk.memblock) {
218         if (s->peek_data)
219             pa_memblock_release(s->peek_memchunk.memblock);
220         pa_memblock_unref(s->peek_memchunk.memblock);
221     }
222
223     if (s->record_memblockq)
224         pa_memblockq_free(s->record_memblockq);
225
226     if (s->proplist)
227         pa_proplist_free(s->proplist);
228
229     if (s->smoother)
230         pa_smoother_free(s->smoother);
231
232     pa_xfree(s->device_name);
233     pa_xfree(s);
234 }
235
236 void pa_stream_unref(pa_stream *s) {
237     pa_assert(s);
238     pa_assert(PA_REFCNT_VALUE(s) >= 1);
239
240     if (PA_REFCNT_DEC(s) <= 0)
241         stream_free(s);
242 }
243
244 pa_stream* pa_stream_ref(pa_stream *s) {
245     pa_assert(s);
246     pa_assert(PA_REFCNT_VALUE(s) >= 1);
247
248     PA_REFCNT_INC(s);
249     return s;
250 }
251
252 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
253     pa_assert(s);
254     pa_assert(PA_REFCNT_VALUE(s) >= 1);
255
256     return s->state;
257 }
258
259 pa_context* pa_stream_get_context(pa_stream *s) {
260     pa_assert(s);
261     pa_assert(PA_REFCNT_VALUE(s) >= 1);
262
263     return s->context;
264 }
265
266 uint32_t pa_stream_get_index(pa_stream *s) {
267     pa_assert(s);
268     pa_assert(PA_REFCNT_VALUE(s) >= 1);
269
270     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
271
272     return s->stream_index;
273 }
274
275 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
276     pa_assert(s);
277     pa_assert(PA_REFCNT_VALUE(s) >= 1);
278
279     if (s->state == st)
280         return;
281
282     pa_stream_ref(s);
283
284     s->state = st;
285
286     if (s->state_callback)
287         s->state_callback(s, s->state_userdata);
288
289     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
290         stream_unlink(s);
291
292     pa_stream_unref(s);
293 }
294
295 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
296     pa_assert(s);
297     pa_assert(PA_REFCNT_VALUE(s) >= 1);
298
299     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
300         return;
301
302     if (s->state == PA_STREAM_READY &&
303         (force || !s->auto_timing_update_requested)) {
304         pa_operation *o;
305
306 /*         pa_log("automatically requesting new timing data"); */
307
308         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
309             pa_operation_unref(o);
310             s->auto_timing_update_requested = TRUE;
311         }
312     }
313
314     if (s->auto_timing_update_event) {
315         struct timeval next;
316         pa_gettimeofday(&next);
317         pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
318         s->mainloop->time_restart(s->auto_timing_update_event, &next);
319     }
320 }
321
322 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
323     pa_context *c = userdata;
324     pa_stream *s;
325     uint32_t channel;
326
327     pa_assert(pd);
328     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
329     pa_assert(t);
330     pa_assert(c);
331     pa_assert(PA_REFCNT_VALUE(c) >= 1);
332
333     pa_context_ref(c);
334
335     if (pa_tagstruct_getu32(t, &channel) < 0 ||
336         !pa_tagstruct_eof(t)) {
337         pa_context_fail(c, PA_ERR_PROTOCOL);
338         goto finish;
339     }
340
341     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
342         goto finish;
343
344     if (s->state != PA_STREAM_READY)
345         goto finish;
346
347     pa_context_set_error(c, PA_ERR_KILLED);
348     pa_stream_set_state(s, PA_STREAM_FAILED);
349
350 finish:
351     pa_context_unref(c);
352 }
353
354 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
355     pa_usec_t x;
356
357     pa_assert(s);
358     pa_assert(!force_start || !force_stop);
359
360     if (!s->smoother)
361         return;
362
363     x = pa_rtclock_usec();
364
365     if (s->timing_info_valid) {
366         if (aposteriori)
367             x -= s->timing_info.transport_usec;
368         else
369             x += s->timing_info.transport_usec;
370
371         if (s->direction == PA_STREAM_PLAYBACK)
372             /* it takes a while until the pause/resume is actually
373              * audible */
374             x += s->timing_info.sink_usec;
375         else
376             /* Data froma  while back will be dropped */
377             x -= s->timing_info.source_usec;
378     }
379
380     if (s->suspended || s->corked || force_stop)
381         pa_smoother_pause(s->smoother, x);
382     else if (force_start || s->buffer_attr.prebuf == 0)
383         pa_smoother_resume(s->smoother, x);
384
385     /* Please note that we have no idea if playback actually started
386      * if prebuf is non-zero! */
387 }
388
389 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
390     pa_context *c = userdata;
391     pa_stream *s;
392     uint32_t channel;
393     const char *dn;
394     pa_bool_t suspended;
395     uint32_t di;
396     pa_usec_t usec;
397     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
398
399     pa_assert(pd);
400     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
401     pa_assert(t);
402     pa_assert(c);
403     pa_assert(PA_REFCNT_VALUE(c) >= 1);
404
405     pa_context_ref(c);
406
407     if (c->version < 12) {
408         pa_context_fail(c, PA_ERR_PROTOCOL);
409         goto finish;
410     }
411
412     if (pa_tagstruct_getu32(t, &channel) < 0 ||
413         pa_tagstruct_getu32(t, &di) < 0 ||
414         pa_tagstruct_gets(t, &dn) < 0 ||
415         pa_tagstruct_get_boolean(t, &suspended) < 0) {
416         pa_context_fail(c, PA_ERR_PROTOCOL);
417         goto finish;
418     }
419
420     if (c->version >= 13) {
421
422         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
423             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
424                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
425                 pa_tagstruct_get_usec(t, &usec) < 0) {
426                 pa_context_fail(c, PA_ERR_PROTOCOL);
427                 goto finish;
428             }
429         } else {
430             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
431                 pa_tagstruct_getu32(t, &tlength) < 0 ||
432                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
433                 pa_tagstruct_getu32(t, &minreq) < 0 ||
434                 pa_tagstruct_get_usec(t, &usec) < 0) {
435                 pa_context_fail(c, PA_ERR_PROTOCOL);
436                 goto finish;
437             }
438         }
439     }
440
441     if (!pa_tagstruct_eof(t)) {
442         pa_context_fail(c, PA_ERR_PROTOCOL);
443         goto finish;
444     }
445
446     if (!dn || di == PA_INVALID_INDEX) {
447         pa_context_fail(c, PA_ERR_PROTOCOL);
448         goto finish;
449     }
450
451     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
452         goto finish;
453
454     if (s->state != PA_STREAM_READY)
455         goto finish;
456
457     if (c->version >= 13) {
458         if (s->direction == PA_STREAM_RECORD)
459             s->timing_info.configured_source_usec = usec;
460         else
461             s->timing_info.configured_sink_usec = usec;
462
463         s->buffer_attr.maxlength = maxlength;
464         s->buffer_attr.fragsize = fragsize;
465         s->buffer_attr.tlength = tlength;
466         s->buffer_attr.prebuf = prebuf;
467         s->buffer_attr.minreq = minreq;
468     }
469
470     pa_xfree(s->device_name);
471     s->device_name = pa_xstrdup(dn);
472     s->device_index = di;
473
474     s->suspended = suspended;
475
476     check_smoother_status(s, TRUE, FALSE, FALSE);
477     request_auto_timing_update(s, TRUE);
478
479     if (s->moved_callback)
480         s->moved_callback(s, s->moved_userdata);
481
482 finish:
483     pa_context_unref(c);
484 }
485
486 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
487     pa_context *c = userdata;
488     pa_stream *s;
489     uint32_t channel;
490     pa_bool_t suspended;
491
492     pa_assert(pd);
493     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
494     pa_assert(t);
495     pa_assert(c);
496     pa_assert(PA_REFCNT_VALUE(c) >= 1);
497
498     pa_context_ref(c);
499
500     if (c->version < 12) {
501         pa_context_fail(c, PA_ERR_PROTOCOL);
502         goto finish;
503     }
504
505     if (pa_tagstruct_getu32(t, &channel) < 0 ||
506         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
507         !pa_tagstruct_eof(t)) {
508         pa_context_fail(c, PA_ERR_PROTOCOL);
509         goto finish;
510     }
511
512     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
513         goto finish;
514
515     if (s->state != PA_STREAM_READY)
516         goto finish;
517
518     s->suspended = suspended;
519
520     check_smoother_status(s, TRUE, FALSE, FALSE);
521     request_auto_timing_update(s, TRUE);
522
523     if (s->suspended_callback)
524         s->suspended_callback(s, s->suspended_userdata);
525
526 finish:
527     pa_context_unref(c);
528 }
529
530 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
531     pa_context *c = userdata;
532     pa_stream *s;
533     uint32_t channel;
534
535     pa_assert(pd);
536     pa_assert(command == PA_COMMAND_STARTED);
537     pa_assert(t);
538     pa_assert(c);
539     pa_assert(PA_REFCNT_VALUE(c) >= 1);
540
541     pa_context_ref(c);
542
543     if (c->version < 13) {
544         pa_context_fail(c, PA_ERR_PROTOCOL);
545         goto finish;
546     }
547
548     if (pa_tagstruct_getu32(t, &channel) < 0 ||
549         !pa_tagstruct_eof(t)) {
550         pa_context_fail(c, PA_ERR_PROTOCOL);
551         goto finish;
552     }
553
554     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
555         goto finish;
556
557     if (s->state != PA_STREAM_READY)
558         goto finish;
559
560     check_smoother_status(s, TRUE, TRUE, FALSE);
561     request_auto_timing_update(s, TRUE);
562
563     if (s->started_callback)
564         s->started_callback(s, s->started_userdata);
565
566 finish:
567     pa_context_unref(c);
568 }
569
570 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
571     pa_context *c = userdata;
572     pa_stream *s;
573     uint32_t channel;
574     pa_proplist *pl = NULL;
575     const char *event;
576
577     pa_assert(pd);
578     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
579     pa_assert(t);
580     pa_assert(c);
581     pa_assert(PA_REFCNT_VALUE(c) >= 1);
582
583     pa_context_ref(c);
584
585     if (c->version < 15) {
586         pa_context_fail(c, PA_ERR_PROTOCOL);
587         goto finish;
588     }
589
590     pl = pa_proplist_new();
591
592     if (pa_tagstruct_getu32(t, &channel) < 0 ||
593         pa_tagstruct_gets(t, &event) < 0 ||
594         pa_tagstruct_get_proplist(t, pl) < 0 ||
595         !pa_tagstruct_eof(t) || !event) {
596         pa_context_fail(c, PA_ERR_PROTOCOL);
597         goto finish;
598     }
599
600     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
601         goto finish;
602
603     if (s->state != PA_STREAM_READY)
604         goto finish;
605
606     if (s->event_callback)
607         s->event_callback(s, event, pl, s->event_userdata);
608
609 finish:
610     pa_context_unref(c);
611
612     if (pl)
613         pa_proplist_free(pl);
614 }
615
616 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
617     pa_stream *s;
618     pa_context *c = userdata;
619     uint32_t bytes, channel;
620
621     pa_assert(pd);
622     pa_assert(command == PA_COMMAND_REQUEST);
623     pa_assert(t);
624     pa_assert(c);
625     pa_assert(PA_REFCNT_VALUE(c) >= 1);
626
627     pa_context_ref(c);
628
629     if (pa_tagstruct_getu32(t, &channel) < 0 ||
630         pa_tagstruct_getu32(t, &bytes) < 0 ||
631         !pa_tagstruct_eof(t)) {
632         pa_context_fail(c, PA_ERR_PROTOCOL);
633         goto finish;
634     }
635
636     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
637         goto finish;
638
639     if (s->state != PA_STREAM_READY)
640         goto finish;
641
642     s->requested_bytes += bytes;
643
644     if (s->requested_bytes > 0 && s->write_callback)
645         s->write_callback(s, s->requested_bytes, s->write_userdata);
646
647 finish:
648     pa_context_unref(c);
649 }
650
651 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
652     pa_stream *s;
653     pa_context *c = userdata;
654     uint32_t channel;
655
656     pa_assert(pd);
657     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
658     pa_assert(t);
659     pa_assert(c);
660     pa_assert(PA_REFCNT_VALUE(c) >= 1);
661
662     pa_context_ref(c);
663
664     if (pa_tagstruct_getu32(t, &channel) < 0 ||
665         !pa_tagstruct_eof(t)) {
666         pa_context_fail(c, PA_ERR_PROTOCOL);
667         goto finish;
668     }
669
670     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
671         goto finish;
672
673     if (s->state != PA_STREAM_READY)
674         goto finish;
675
676     if (s->buffer_attr.prebuf > 0)
677         check_smoother_status(s, TRUE, FALSE, TRUE);
678
679     request_auto_timing_update(s, TRUE);
680
681     if (command == PA_COMMAND_OVERFLOW) {
682         if (s->overflow_callback)
683             s->overflow_callback(s, s->overflow_userdata);
684     } else if (command == PA_COMMAND_UNDERFLOW) {
685         if (s->underflow_callback)
686             s->underflow_callback(s, s->underflow_userdata);
687     }
688
689  finish:
690     pa_context_unref(c);
691 }
692
693 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
694     pa_assert(s);
695     pa_assert(PA_REFCNT_VALUE(s) >= 1);
696
697 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
698
699     if (s->state != PA_STREAM_READY)
700         return;
701
702     if (w) {
703         s->write_index_not_before = s->context->ctag;
704
705         if (s->timing_info_valid)
706             s->timing_info.write_index_corrupt = TRUE;
707
708 /*         pa_log("write_index invalidated"); */
709     }
710
711     if (r) {
712         s->read_index_not_before = s->context->ctag;
713
714         if (s->timing_info_valid)
715             s->timing_info.read_index_corrupt = TRUE;
716
717 /*         pa_log("read_index invalidated"); */
718     }
719
720     request_auto_timing_update(s, TRUE);
721 }
722
723 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
724     pa_stream *s = userdata;
725
726     pa_assert(s);
727     pa_assert(PA_REFCNT_VALUE(s) >= 1);
728
729     pa_stream_ref(s);
730     request_auto_timing_update(s, FALSE);
731     pa_stream_unref(s);
732 }
733
734 static void create_stream_complete(pa_stream *s) {
735     pa_assert(s);
736     pa_assert(PA_REFCNT_VALUE(s) >= 1);
737     pa_assert(s->state == PA_STREAM_CREATING);
738
739     pa_stream_set_state(s, PA_STREAM_READY);
740
741     if (s->requested_bytes > 0 && s->write_callback)
742         s->write_callback(s, s->requested_bytes, s->write_userdata);
743
744     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
745         struct timeval tv;
746         pa_gettimeofday(&tv);
747         tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
748         pa_assert(!s->auto_timing_update_event);
749         s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
750
751         request_auto_timing_update(s, TRUE);
752     }
753
754     check_smoother_status(s, TRUE, FALSE, FALSE);
755 }
756
757 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
758     pa_assert(s);
759     pa_assert(attr);
760     pa_assert(ss);
761
762     if (s->context->version >= 13)
763         return;
764
765     /* Version older than 0.9.10 didn't do server side buffer_attr
766      * selection, hence we have to fake it on the client side. */
767
768     /* We choose fairly conservative values here, to not confuse
769      * old clients with extremely large playback buffers */
770
771     if (attr->maxlength == (uint32_t) -1)
772         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
773
774     if (attr->tlength == (uint32_t) -1)
775         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
776
777     if (attr->minreq == (uint32_t) -1)
778         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
779
780     if (attr->prebuf == (uint32_t) -1)
781         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
782
783     if (attr->fragsize == (uint32_t) -1)
784         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
785 }
786
787 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
788     pa_stream *s = userdata;
789
790     pa_assert(pd);
791     pa_assert(s);
792     pa_assert(PA_REFCNT_VALUE(s) >= 1);
793     pa_assert(s->state == PA_STREAM_CREATING);
794
795     pa_stream_ref(s);
796
797     if (command != PA_COMMAND_REPLY) {
798         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
799             goto finish;
800
801         pa_stream_set_state(s, PA_STREAM_FAILED);
802         goto finish;
803     }
804
805     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
806         s->channel == PA_INVALID_INDEX ||
807         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
808         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
809         pa_context_fail(s->context, PA_ERR_PROTOCOL);
810         goto finish;
811     }
812
813     if (s->context->version >= 9) {
814         if (s->direction == PA_STREAM_PLAYBACK) {
815             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
816                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
817                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
818                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
819                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
820                 goto finish;
821             }
822         } else if (s->direction == PA_STREAM_RECORD) {
823             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
824                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
825                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
826                 goto finish;
827             }
828         }
829     }
830
831     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
832         pa_sample_spec ss;
833         pa_channel_map cm;
834         const char *dn = NULL;
835         pa_bool_t suspended;
836
837         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
838             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
839             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
840             pa_tagstruct_gets(t, &dn) < 0 ||
841             pa_tagstruct_get_boolean(t, &suspended) < 0) {
842             pa_context_fail(s->context, PA_ERR_PROTOCOL);
843             goto finish;
844         }
845
846         if (!dn || s->device_index == PA_INVALID_INDEX ||
847             ss.channels != cm.channels ||
848             !pa_channel_map_valid(&cm) ||
849             !pa_sample_spec_valid(&ss) ||
850             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
851             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
852             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
853             pa_context_fail(s->context, PA_ERR_PROTOCOL);
854             goto finish;
855         }
856
857         pa_xfree(s->device_name);
858         s->device_name = pa_xstrdup(dn);
859         s->suspended = suspended;
860
861         s->channel_map = cm;
862         s->sample_spec = ss;
863     }
864
865     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
866         pa_usec_t usec;
867
868         if (pa_tagstruct_get_usec(t, &usec) < 0) {
869             pa_context_fail(s->context, PA_ERR_PROTOCOL);
870             goto finish;
871         }
872
873         if (s->direction == PA_STREAM_RECORD)
874             s->timing_info.configured_source_usec = usec;
875         else
876             s->timing_info.configured_sink_usec = usec;
877     }
878
879     if (!pa_tagstruct_eof(t)) {
880         pa_context_fail(s->context, PA_ERR_PROTOCOL);
881         goto finish;
882     }
883
884     if (s->direction == PA_STREAM_RECORD) {
885         pa_assert(!s->record_memblockq);
886
887         s->record_memblockq = pa_memblockq_new(
888                 0,
889                 s->buffer_attr.maxlength,
890                 0,
891                 pa_frame_size(&s->sample_spec),
892                 1,
893                 0,
894                 0,
895                 NULL);
896     }
897
898     s->channel_valid = TRUE;
899     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
900
901     create_stream_complete(s);
902
903 finish:
904     pa_stream_unref(s);
905 }
906
907 static int create_stream(
908         pa_stream_direction_t direction,
909         pa_stream *s,
910         const char *dev,
911         const pa_buffer_attr *attr,
912         pa_stream_flags_t flags,
913         const pa_cvolume *volume,
914         pa_stream *sync_stream) {
915
916     pa_tagstruct *t;
917     uint32_t tag;
918     pa_bool_t volume_set = FALSE;
919
920     pa_assert(s);
921     pa_assert(PA_REFCNT_VALUE(s) >= 1);
922     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
923
924     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
925     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
926     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
927                                               PA_STREAM_INTERPOLATE_TIMING|
928                                               PA_STREAM_NOT_MONOTONIC|
929                                               PA_STREAM_AUTO_TIMING_UPDATE|
930                                               PA_STREAM_NO_REMAP_CHANNELS|
931                                               PA_STREAM_NO_REMIX_CHANNELS|
932                                               PA_STREAM_FIX_FORMAT|
933                                               PA_STREAM_FIX_RATE|
934                                               PA_STREAM_FIX_CHANNELS|
935                                               PA_STREAM_DONT_MOVE|
936                                               PA_STREAM_VARIABLE_RATE|
937                                               PA_STREAM_PEAK_DETECT|
938                                               PA_STREAM_START_MUTED|
939                                               PA_STREAM_ADJUST_LATENCY|
940                                               PA_STREAM_EARLY_REQUESTS|
941                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
942                                               PA_STREAM_START_UNMUTED|
943                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
944
945     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
946     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
947     /* Althought some of the other flags are not supported on older
948      * version, we don't check for them here, because it doesn't hurt
949      * when they are passed but actually not supported. This makes
950      * client development easier */
951
952     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
953     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
954     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
955     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
956     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
957
958     pa_stream_ref(s);
959
960     s->direction = direction;
961     s->flags = flags;
962     s->corked = !!(flags & PA_STREAM_START_CORKED);
963
964     if (sync_stream)
965         s->syncid = sync_stream->syncid;
966
967     if (attr)
968         s->buffer_attr = *attr;
969     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
970
971     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
972         pa_usec_t x;
973
974         if (s->smoother)
975             pa_smoother_free(s->smoother);
976
977         s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
978
979         x = pa_rtclock_usec();
980         pa_smoother_set_time_offset(s->smoother, x);
981         pa_smoother_pause(s->smoother, x);
982     }
983
984     if (!dev)
985         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
986
987     t = pa_tagstruct_command(
988             s->context,
989             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
990             &tag);
991
992     if (s->context->version < 13)
993         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
994
995     pa_tagstruct_put(
996             t,
997             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
998             PA_TAG_CHANNEL_MAP, &s->channel_map,
999             PA_TAG_U32, PA_INVALID_INDEX,
1000             PA_TAG_STRING, dev,
1001             PA_TAG_U32, s->buffer_attr.maxlength,
1002             PA_TAG_BOOLEAN, s->corked,
1003             PA_TAG_INVALID);
1004
1005     if (s->direction == PA_STREAM_PLAYBACK) {
1006         pa_cvolume cv;
1007
1008         pa_tagstruct_put(
1009                 t,
1010                 PA_TAG_U32, s->buffer_attr.tlength,
1011                 PA_TAG_U32, s->buffer_attr.prebuf,
1012                 PA_TAG_U32, s->buffer_attr.minreq,
1013                 PA_TAG_U32, s->syncid,
1014                 PA_TAG_INVALID);
1015
1016         volume_set = !!volume;
1017
1018         if (!volume)
1019             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1020
1021         pa_tagstruct_put_cvolume(t, volume);
1022     } else
1023         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1024
1025     if (s->context->version >= 12) {
1026         pa_tagstruct_put(
1027                 t,
1028                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1029                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1030                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1031                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1032                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1033                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1034                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1035                 PA_TAG_INVALID);
1036     }
1037
1038     if (s->context->version >= 13) {
1039
1040         if (s->direction == PA_STREAM_PLAYBACK)
1041             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1042         else
1043             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1044
1045         pa_tagstruct_put(
1046                 t,
1047                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1048                 PA_TAG_PROPLIST, s->proplist,
1049                 PA_TAG_INVALID);
1050
1051         if (s->direction == PA_STREAM_RECORD)
1052             pa_tagstruct_putu32(t, s->direct_on_input);
1053     }
1054
1055     if (s->context->version >= 14) {
1056
1057         if (s->direction == PA_STREAM_PLAYBACK)
1058             pa_tagstruct_put_boolean(t, volume_set);
1059
1060         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1061     }
1062
1063     if (s->context->version >= 15) {
1064
1065         if (s->direction == PA_STREAM_PLAYBACK)
1066             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1067
1068         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1069         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1070     }
1071
1072     pa_pstream_send_tagstruct(s->context->pstream, t);
1073     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1074
1075     pa_stream_set_state(s, PA_STREAM_CREATING);
1076
1077     pa_stream_unref(s);
1078     return 0;
1079 }
1080
1081 int pa_stream_connect_playback(
1082         pa_stream *s,
1083         const char *dev,
1084         const pa_buffer_attr *attr,
1085         pa_stream_flags_t flags,
1086         pa_cvolume *volume,
1087         pa_stream *sync_stream) {
1088
1089     pa_assert(s);
1090     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1091
1092     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1093 }
1094
1095 int pa_stream_connect_record(
1096         pa_stream *s,
1097         const char *dev,
1098         const pa_buffer_attr *attr,
1099         pa_stream_flags_t flags) {
1100
1101     pa_assert(s);
1102     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1103
1104     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1105 }
1106
1107 int pa_stream_write(
1108         pa_stream *s,
1109         const void *data,
1110         size_t length,
1111         void (*free_cb)(void *p),
1112         int64_t offset,
1113         pa_seek_mode_t seek) {
1114
1115     pa_memchunk chunk;
1116     pa_seek_mode_t t_seek;
1117     int64_t t_offset;
1118     size_t t_length;
1119     const void *t_data;
1120
1121     pa_assert(s);
1122     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1123     pa_assert(data);
1124
1125     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1126     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1127     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1128     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1129
1130     if (length <= 0)
1131         return 0;
1132
1133     t_seek = seek;
1134     t_offset = offset;
1135     t_length = length;
1136     t_data = data;
1137
1138     while (t_length > 0) {
1139
1140         chunk.index = 0;
1141
1142         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1143             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1144             chunk.length = t_length;
1145         } else {
1146             void *d;
1147
1148             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1149             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1150
1151             d = pa_memblock_acquire(chunk.memblock);
1152             memcpy(d, t_data, chunk.length);
1153             pa_memblock_release(chunk.memblock);
1154         }
1155
1156         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1157
1158         t_offset = 0;
1159         t_seek = PA_SEEK_RELATIVE;
1160
1161         t_data = (const uint8_t*) t_data + chunk.length;
1162         t_length -= chunk.length;
1163
1164         pa_memblock_unref(chunk.memblock);
1165     }
1166
1167     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1168         free_cb((void*) data);
1169
1170     if (length < s->requested_bytes)
1171         s->requested_bytes -= (uint32_t) length;
1172     else
1173         s->requested_bytes = 0;
1174
1175     /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1176
1177     if (s->direction == PA_STREAM_PLAYBACK) {
1178
1179         /* Update latency request correction */
1180         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1181
1182             if (seek == PA_SEEK_ABSOLUTE) {
1183                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1184                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1185                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1186             } else if (seek == PA_SEEK_RELATIVE) {
1187                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1188                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1189             } else
1190                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1191         }
1192
1193         /* Update the write index in the already available latency data */
1194         if (s->timing_info_valid) {
1195
1196             if (seek == PA_SEEK_ABSOLUTE) {
1197                 s->timing_info.write_index_corrupt = FALSE;
1198                 s->timing_info.write_index = offset + (int64_t) length;
1199             } else if (seek == PA_SEEK_RELATIVE) {
1200                 if (!s->timing_info.write_index_corrupt)
1201                     s->timing_info.write_index += offset + (int64_t) length;
1202             } else
1203                 s->timing_info.write_index_corrupt = TRUE;
1204         }
1205
1206         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1207             request_auto_timing_update(s, TRUE);
1208     }
1209
1210     return 0;
1211 }
1212
1213 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1214     pa_assert(s);
1215     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1216     pa_assert(data);
1217     pa_assert(length);
1218
1219     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1220     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1221
1222     if (!s->peek_memchunk.memblock) {
1223
1224         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1225             *data = NULL;
1226             *length = 0;
1227             return 0;
1228         }
1229
1230         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1231     }
1232
1233     pa_assert(s->peek_data);
1234     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1235     *length = s->peek_memchunk.length;
1236     return 0;
1237 }
1238
1239 int pa_stream_drop(pa_stream *s) {
1240     pa_assert(s);
1241     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1242
1243     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1244     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1245     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1246
1247     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1248
1249     /* Fix the simulated local read index */
1250     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1251         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1252
1253     pa_assert(s->peek_data);
1254     pa_memblock_release(s->peek_memchunk.memblock);
1255     pa_memblock_unref(s->peek_memchunk.memblock);
1256     pa_memchunk_reset(&s->peek_memchunk);
1257
1258     return 0;
1259 }
1260
1261 size_t pa_stream_writable_size(pa_stream *s) {
1262     pa_assert(s);
1263     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1264
1265     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1266     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1267
1268     return s->requested_bytes;
1269 }
1270
1271 size_t pa_stream_readable_size(pa_stream *s) {
1272     pa_assert(s);
1273     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1274
1275     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1276     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1277
1278     return pa_memblockq_get_length(s->record_memblockq);
1279 }
1280
1281 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1282     pa_operation *o;
1283     pa_tagstruct *t;
1284     uint32_t tag;
1285
1286     pa_assert(s);
1287     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1288
1289     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1290     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1291
1292     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1293
1294     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1295     pa_tagstruct_putu32(t, s->channel);
1296     pa_pstream_send_tagstruct(s->context->pstream, t);
1297     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1298
1299     return o;
1300 }
1301
1302 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1303     pa_usec_t usec;
1304
1305     pa_assert(s);
1306     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1307     pa_assert(s->state == PA_STREAM_READY);
1308     pa_assert(s->direction != PA_STREAM_UPLOAD);
1309     pa_assert(s->timing_info_valid);
1310     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1311     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1312
1313     if (s->direction == PA_STREAM_PLAYBACK) {
1314         /* The last byte that was written into the output device
1315          * had this time value associated */
1316         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1317
1318         if (!s->corked && !s->suspended) {
1319
1320             if (!ignore_transport)
1321                 /* Because the latency info took a little time to come
1322                  * to us, we assume that the real output time is actually
1323                  * a little ahead */
1324                 usec += s->timing_info.transport_usec;
1325
1326             /* However, the output device usually maintains a buffer
1327                too, hence the real sample currently played is a little
1328                back  */
1329             if (s->timing_info.sink_usec >= usec)
1330                 usec = 0;
1331             else
1332                 usec -= s->timing_info.sink_usec;
1333         }
1334
1335     } else {
1336         pa_assert(s->direction == PA_STREAM_RECORD);
1337
1338         /* The last byte written into the server side queue had
1339          * this time value associated */
1340         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1341
1342         if (!s->corked && !s->suspended) {
1343
1344             if (!ignore_transport)
1345                 /* Add transport latency */
1346                 usec += s->timing_info.transport_usec;
1347
1348             /* Add latency of data in device buffer */
1349             usec += s->timing_info.source_usec;
1350
1351             /* If this is a monitor source, we need to correct the
1352              * time by the playback device buffer */
1353             if (s->timing_info.sink_usec >= usec)
1354                 usec = 0;
1355             else
1356                 usec -= s->timing_info.sink_usec;
1357         }
1358     }
1359
1360     return usec;
1361 }
1362
1363 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1364     pa_operation *o = userdata;
1365     struct timeval local, remote, now;
1366     pa_timing_info *i;
1367     pa_bool_t playing = FALSE;
1368     uint64_t underrun_for = 0, playing_for = 0;
1369
1370     pa_assert(pd);
1371     pa_assert(o);
1372     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1373
1374     if (!o->context || !o->stream)
1375         goto finish;
1376
1377     i = &o->stream->timing_info;
1378
1379     o->stream->timing_info_valid = FALSE;
1380     i->write_index_corrupt = TRUE;
1381     i->read_index_corrupt = TRUE;
1382
1383     if (command != PA_COMMAND_REPLY) {
1384         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1385             goto finish;
1386
1387     } else {
1388
1389         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1390             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1391             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1392             pa_tagstruct_get_timeval(t, &local) < 0 ||
1393             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1394             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1395             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1396
1397             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1398             goto finish;
1399         }
1400
1401         if (o->context->version >= 13 &&
1402             o->stream->direction == PA_STREAM_PLAYBACK)
1403             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1404                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1405
1406                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1407                 goto finish;
1408             }
1409
1410
1411         if (!pa_tagstruct_eof(t)) {
1412             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1413             goto finish;
1414         }
1415         o->stream->timing_info_valid = TRUE;
1416         i->write_index_corrupt = FALSE;
1417         i->read_index_corrupt = FALSE;
1418
1419         i->playing = (int) playing;
1420         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1421
1422         pa_gettimeofday(&now);
1423
1424         /* Calculcate timestamps */
1425         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1426             /* local and remote seem to have synchronized clocks */
1427
1428             if (o->stream->direction == PA_STREAM_PLAYBACK)
1429                 i->transport_usec = pa_timeval_diff(&remote, &local);
1430             else
1431                 i->transport_usec = pa_timeval_diff(&now, &remote);
1432
1433             i->synchronized_clocks = TRUE;
1434             i->timestamp = remote;
1435         } else {
1436             /* clocks are not synchronized, let's estimate latency then */
1437             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1438             i->synchronized_clocks = FALSE;
1439             i->timestamp = local;
1440             pa_timeval_add(&i->timestamp, i->transport_usec);
1441         }
1442
1443         /* Invalidate read and write indexes if necessary */
1444         if (tag < o->stream->read_index_not_before)
1445             i->read_index_corrupt = TRUE;
1446
1447         if (tag < o->stream->write_index_not_before)
1448             i->write_index_corrupt = TRUE;
1449
1450         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1451             /* Write index correction */
1452
1453             int n, j;
1454             uint32_t ctag = tag;
1455
1456             /* Go through the saved correction values and add up the
1457              * total correction.*/
1458             for (n = 0, j = o->stream->current_write_index_correction+1;
1459                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1460                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1461
1462                 /* Step over invalid data or out-of-date data */
1463                 if (!o->stream->write_index_corrections[j].valid ||
1464                     o->stream->write_index_corrections[j].tag < ctag)
1465                     continue;
1466
1467                 /* Make sure that everything is in order */
1468                 ctag = o->stream->write_index_corrections[j].tag+1;
1469
1470                 /* Now fix the write index */
1471                 if (o->stream->write_index_corrections[j].corrupt) {
1472                     /* A corrupting seek was made */
1473                     i->write_index_corrupt = TRUE;
1474                 } else if (o->stream->write_index_corrections[j].absolute) {
1475                     /* An absolute seek was made */
1476                     i->write_index = o->stream->write_index_corrections[j].value;
1477                     i->write_index_corrupt = FALSE;
1478                 } else if (!i->write_index_corrupt) {
1479                     /* A relative seek was made */
1480                     i->write_index += o->stream->write_index_corrections[j].value;
1481                 }
1482             }
1483
1484             /* Clear old correction entries */
1485             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1486                 if (!o->stream->write_index_corrections[n].valid)
1487                     continue;
1488
1489                 if (o->stream->write_index_corrections[n].tag <= tag)
1490                     o->stream->write_index_corrections[n].valid = FALSE;
1491             }
1492         }
1493
1494         if (o->stream->direction == PA_STREAM_RECORD) {
1495             /* Read index correction */
1496
1497             if (!i->read_index_corrupt)
1498                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1499         }
1500
1501         /* Update smoother */
1502         if (o->stream->smoother) {
1503             pa_usec_t u, x;
1504
1505             u = x = pa_rtclock_usec() - i->transport_usec;
1506
1507             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1508                 pa_usec_t su;
1509
1510                 /* If we weren't playing then it will take some time
1511                  * until the audio will actually come out through the
1512                  * speakers. Since we follow that timing here, we need
1513                  * to try to fix this up */
1514
1515                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1516
1517                 if (su < i->sink_usec)
1518                     x += i->sink_usec - su;
1519             }
1520
1521             if (!i->playing)
1522                 pa_smoother_pause(o->stream->smoother, x);
1523
1524             /* Update the smoother */
1525             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1526                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1527                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1528
1529             if (i->playing)
1530                 pa_smoother_resume(o->stream->smoother, x);
1531         }
1532     }
1533
1534     o->stream->auto_timing_update_requested = FALSE;
1535
1536     if (o->stream->latency_update_callback)
1537         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1538
1539     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1540         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1541         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1542     }
1543
1544 finish:
1545
1546     pa_operation_done(o);
1547     pa_operation_unref(o);
1548 }
1549
1550 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1551     uint32_t tag;
1552     pa_operation *o;
1553     pa_tagstruct *t;
1554     struct timeval now;
1555     int cidx = 0;
1556
1557     pa_assert(s);
1558     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1559
1560     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1561     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1562
1563     if (s->direction == PA_STREAM_PLAYBACK) {
1564         /* Find a place to store the write_index correction data for this entry */
1565         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1566
1567         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1568         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1569     }
1570     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1571
1572     t = pa_tagstruct_command(
1573             s->context,
1574             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1575             &tag);
1576     pa_tagstruct_putu32(t, s->channel);
1577     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1578
1579     pa_pstream_send_tagstruct(s->context->pstream, t);
1580     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1581
1582     if (s->direction == PA_STREAM_PLAYBACK) {
1583         /* Fill in initial correction data */
1584
1585         s->current_write_index_correction = cidx;
1586
1587         s->write_index_corrections[cidx].valid = TRUE;
1588         s->write_index_corrections[cidx].absolute = FALSE;
1589         s->write_index_corrections[cidx].corrupt = FALSE;
1590         s->write_index_corrections[cidx].tag = tag;
1591         s->write_index_corrections[cidx].value = 0;
1592     }
1593
1594     return o;
1595 }
1596
1597 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1598     pa_stream *s = userdata;
1599
1600     pa_assert(pd);
1601     pa_assert(s);
1602     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1603
1604     pa_stream_ref(s);
1605
1606     if (command != PA_COMMAND_REPLY) {
1607         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1608             goto finish;
1609
1610         pa_stream_set_state(s, PA_STREAM_FAILED);
1611         goto finish;
1612     } else if (!pa_tagstruct_eof(t)) {
1613         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1614         goto finish;
1615     }
1616
1617     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1618
1619 finish:
1620     pa_stream_unref(s);
1621 }
1622
1623 int pa_stream_disconnect(pa_stream *s) {
1624     pa_tagstruct *t;
1625     uint32_t tag;
1626
1627     pa_assert(s);
1628     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1629
1630     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1631     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1632
1633     pa_stream_ref(s);
1634
1635     t = pa_tagstruct_command(
1636             s->context,
1637             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1638                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1639             &tag);
1640     pa_tagstruct_putu32(t, s->channel);
1641     pa_pstream_send_tagstruct(s->context->pstream, t);
1642     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1643
1644     pa_stream_unref(s);
1645     return 0;
1646 }
1647
1648 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1649     pa_assert(s);
1650     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1651
1652     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1653         return;
1654
1655     s->read_callback = cb;
1656     s->read_userdata = userdata;
1657 }
1658
1659 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1660     pa_assert(s);
1661     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1662
1663     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1664         return;
1665
1666     s->write_callback = cb;
1667     s->write_userdata = userdata;
1668 }
1669
1670 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1671     pa_assert(s);
1672     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1673
1674     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1675         return;
1676
1677     s->state_callback = cb;
1678     s->state_userdata = userdata;
1679 }
1680
1681 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1682     pa_assert(s);
1683     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1684
1685     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1686         return;
1687
1688     s->overflow_callback = cb;
1689     s->overflow_userdata = userdata;
1690 }
1691
1692 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1693     pa_assert(s);
1694     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1695
1696     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1697         return;
1698
1699     s->underflow_callback = cb;
1700     s->underflow_userdata = userdata;
1701 }
1702
1703 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1704     pa_assert(s);
1705     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1706
1707     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1708         return;
1709
1710     s->latency_update_callback = cb;
1711     s->latency_update_userdata = userdata;
1712 }
1713
1714 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1715     pa_assert(s);
1716     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1717
1718     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1719         return;
1720
1721     s->moved_callback = cb;
1722     s->moved_userdata = userdata;
1723 }
1724
1725 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1726     pa_assert(s);
1727     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1728
1729     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1730         return;
1731
1732     s->suspended_callback = cb;
1733     s->suspended_userdata = userdata;
1734 }
1735
1736 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1737     pa_assert(s);
1738     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1739
1740     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1741         return;
1742
1743     s->started_callback = cb;
1744     s->started_userdata = userdata;
1745 }
1746
1747 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1748     pa_assert(s);
1749     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1750
1751     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1752         return;
1753
1754     s->event_callback = cb;
1755     s->event_userdata = userdata;
1756 }
1757
1758 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1759     pa_operation *o = userdata;
1760     int success = 1;
1761
1762     pa_assert(pd);
1763     pa_assert(o);
1764     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1765
1766     if (!o->context)
1767         goto finish;
1768
1769     if (command != PA_COMMAND_REPLY) {
1770         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1771             goto finish;
1772
1773         success = 0;
1774     } else if (!pa_tagstruct_eof(t)) {
1775         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1776         goto finish;
1777     }
1778
1779     if (o->callback) {
1780         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1781         cb(o->stream, success, o->userdata);
1782     }
1783
1784 finish:
1785     pa_operation_done(o);
1786     pa_operation_unref(o);
1787 }
1788
1789 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1790     pa_operation *o;
1791     pa_tagstruct *t;
1792     uint32_t tag;
1793
1794     pa_assert(s);
1795     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1796
1797     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1798     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1799
1800     s->corked = b;
1801
1802     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1803
1804     t = pa_tagstruct_command(
1805             s->context,
1806             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1807             &tag);
1808     pa_tagstruct_putu32(t, s->channel);
1809     pa_tagstruct_put_boolean(t, !!b);
1810     pa_pstream_send_tagstruct(s->context->pstream, t);
1811     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1812
1813     check_smoother_status(s, FALSE, FALSE, FALSE);
1814
1815     /* This might cause the indexes to hang/start again, hence
1816      * let's request a timing update */
1817     request_auto_timing_update(s, TRUE);
1818
1819     return o;
1820 }
1821
1822 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1823     pa_tagstruct *t;
1824     pa_operation *o;
1825     uint32_t tag;
1826
1827     pa_assert(s);
1828     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1829
1830     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1831
1832     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1833
1834     t = pa_tagstruct_command(s->context, command, &tag);
1835     pa_tagstruct_putu32(t, s->channel);
1836     pa_pstream_send_tagstruct(s->context->pstream, t);
1837     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1838
1839     return o;
1840 }
1841
1842 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1843     pa_operation *o;
1844
1845     pa_assert(s);
1846     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1847
1848     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1849     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1850
1851     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1852         return NULL;
1853
1854     if (s->direction == PA_STREAM_PLAYBACK) {
1855
1856         if (s->write_index_corrections[s->current_write_index_correction].valid)
1857             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1858
1859         if (s->buffer_attr.prebuf > 0)
1860             check_smoother_status(s, FALSE, FALSE, TRUE);
1861
1862         /* This will change the write index, but leave the
1863          * read index untouched. */
1864         invalidate_indexes(s, FALSE, TRUE);
1865
1866     } else
1867         /* For record streams this has no influence on the write
1868          * index, but the read index might jump. */
1869         invalidate_indexes(s, TRUE, FALSE);
1870
1871     return o;
1872 }
1873
1874 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1875     pa_operation *o;
1876
1877     pa_assert(s);
1878     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1879
1880     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1881     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1882     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1883
1884     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1885         return NULL;
1886
1887     /* This might cause the read index to hang again, hence
1888      * let's request a timing update */
1889     request_auto_timing_update(s, TRUE);
1890
1891     return o;
1892 }
1893
1894 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1895     pa_operation *o;
1896
1897     pa_assert(s);
1898     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1899
1900     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1901     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1902     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1903
1904     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1905         return NULL;
1906
1907     /* This might cause the read index to start moving again, hence
1908      * let's request a timing update */
1909     request_auto_timing_update(s, TRUE);
1910
1911     return o;
1912 }
1913
1914 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1915     pa_operation *o;
1916
1917     pa_assert(s);
1918     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1919     pa_assert(name);
1920
1921     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1922     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1923
1924     if (s->context->version >= 13) {
1925         pa_proplist *p = pa_proplist_new();
1926
1927         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1928         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
1929         pa_proplist_free(p);
1930     } else {
1931         pa_tagstruct *t;
1932         uint32_t tag;
1933
1934         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1935         t = pa_tagstruct_command(
1936                 s->context,
1937                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
1938                 &tag);
1939         pa_tagstruct_putu32(t, s->channel);
1940         pa_tagstruct_puts(t, name);
1941         pa_pstream_send_tagstruct(s->context->pstream, t);
1942         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1943     }
1944
1945     return o;
1946 }
1947
1948 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
1949     pa_usec_t usec;
1950
1951     pa_assert(s);
1952     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1953
1954     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1955     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1956     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
1957     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
1958     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
1959
1960     if (s->smoother)
1961         usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
1962     else
1963         usec = calc_time(s, FALSE);
1964
1965     /* Make sure the time runs monotonically */
1966     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
1967         if (usec < s->previous_time)
1968             usec = s->previous_time;
1969         else
1970             s->previous_time = usec;
1971     }
1972
1973     if (r_usec)
1974         *r_usec = usec;
1975
1976     return 0;
1977 }
1978
1979 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
1980     pa_assert(s);
1981     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1982
1983     if (negative)
1984         *negative = 0;
1985
1986     if (a >= b)
1987         return a-b;
1988     else {
1989         if (negative && s->direction == PA_STREAM_RECORD) {
1990             *negative = 1;
1991             return b-a;
1992         } else
1993             return 0;
1994     }
1995 }
1996
1997 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
1998     pa_usec_t t, c;
1999     int r;
2000     int64_t cindex;
2001
2002     pa_assert(s);
2003     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2004     pa_assert(r_usec);
2005
2006     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2007     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2008     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2009     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2010     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2011
2012     if ((r = pa_stream_get_time(s, &t)) < 0)
2013         return r;
2014
2015     if (s->direction == PA_STREAM_PLAYBACK)
2016         cindex = s->timing_info.write_index;
2017     else
2018         cindex = s->timing_info.read_index;
2019
2020     if (cindex < 0)
2021         cindex = 0;
2022
2023     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2024
2025     if (s->direction == PA_STREAM_PLAYBACK)
2026         *r_usec = time_counter_diff(s, c, t, negative);
2027     else
2028         *r_usec = time_counter_diff(s, t, c, negative);
2029
2030     return 0;
2031 }
2032
2033 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2034     pa_assert(s);
2035     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2036
2037     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2038     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2039     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2040
2041     return &s->timing_info;
2042 }
2043
2044 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2045     pa_assert(s);
2046     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2047
2048     return &s->sample_spec;
2049 }
2050
2051 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2052     pa_assert(s);
2053     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2054
2055     return &s->channel_map;
2056 }
2057
2058 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2059     pa_assert(s);
2060     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2061
2062     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2063     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2064     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2065
2066     return &s->buffer_attr;
2067 }
2068
2069 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2070     pa_operation *o = userdata;
2071     int success = 1;
2072
2073     pa_assert(pd);
2074     pa_assert(o);
2075     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2076
2077     if (!o->context)
2078         goto finish;
2079
2080     if (command != PA_COMMAND_REPLY) {
2081         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2082             goto finish;
2083
2084         success = 0;
2085     } else {
2086         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2087             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2088                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2089                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2090                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2091                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2092                 goto finish;
2093             }
2094         } else if (o->stream->direction == PA_STREAM_RECORD) {
2095             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2096                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2097                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2098                 goto finish;
2099             }
2100         }
2101
2102         if (o->stream->context->version >= 13) {
2103             pa_usec_t usec;
2104
2105             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2106                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2107                 goto finish;
2108             }
2109
2110             if (o->stream->direction == PA_STREAM_RECORD)
2111                 o->stream->timing_info.configured_source_usec = usec;
2112             else
2113                 o->stream->timing_info.configured_sink_usec = usec;
2114         }
2115
2116         if (!pa_tagstruct_eof(t)) {
2117             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2118             goto finish;
2119         }
2120     }
2121
2122     if (o->callback) {
2123         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2124         cb(o->stream, success, o->userdata);
2125     }
2126
2127 finish:
2128     pa_operation_done(o);
2129     pa_operation_unref(o);
2130 }
2131
2132
2133 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2134     pa_operation *o;
2135     pa_tagstruct *t;
2136     uint32_t tag;
2137
2138     pa_assert(s);
2139     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140     pa_assert(attr);
2141
2142     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2143     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2144     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2145
2146     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2147
2148     t = pa_tagstruct_command(
2149             s->context,
2150             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2151             &tag);
2152     pa_tagstruct_putu32(t, s->channel);
2153
2154     pa_tagstruct_putu32(t, attr->maxlength);
2155
2156     if (s->direction == PA_STREAM_PLAYBACK)
2157         pa_tagstruct_put(
2158                 t,
2159                 PA_TAG_U32, attr->tlength,
2160                 PA_TAG_U32, attr->prebuf,
2161                 PA_TAG_U32, attr->minreq,
2162                 PA_TAG_INVALID);
2163     else
2164         pa_tagstruct_putu32(t, attr->fragsize);
2165
2166     if (s->context->version >= 13)
2167         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2168
2169     if (s->context->version >= 14)
2170         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2171
2172     pa_pstream_send_tagstruct(s->context->pstream, t);
2173     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2174
2175     /* This might cause changes in the read/write indexex, hence let's
2176      * request a timing update */
2177     request_auto_timing_update(s, TRUE);
2178
2179     return o;
2180 }
2181
2182 uint32_t pa_stream_get_device_index(pa_stream *s) {
2183     pa_assert(s);
2184     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2185
2186     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2187     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2188     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2189     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2190
2191     return s->device_index;
2192 }
2193
2194 const char *pa_stream_get_device_name(pa_stream *s) {
2195     pa_assert(s);
2196     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2197
2198     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2199     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2200     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2201     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2202
2203     return s->device_name;
2204 }
2205
2206 int pa_stream_is_suspended(pa_stream *s) {
2207     pa_assert(s);
2208     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2209
2210     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2211     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2212     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2213
2214     return s->suspended;
2215 }
2216
2217 int pa_stream_is_corked(pa_stream *s) {
2218     pa_assert(s);
2219     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2220
2221     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2222     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2223
2224     return s->corked;
2225 }
2226
2227 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2228     pa_operation *o = userdata;
2229     int success = 1;
2230
2231     pa_assert(pd);
2232     pa_assert(o);
2233     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2234
2235     if (!o->context)
2236         goto finish;
2237
2238     if (command != PA_COMMAND_REPLY) {
2239         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2240             goto finish;
2241
2242         success = 0;
2243     } else {
2244
2245         if (!pa_tagstruct_eof(t)) {
2246             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2247             goto finish;
2248         }
2249     }
2250
2251     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2252     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2253
2254     if (o->callback) {
2255         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2256         cb(o->stream, success, o->userdata);
2257     }
2258
2259 finish:
2260     pa_operation_done(o);
2261     pa_operation_unref(o);
2262 }
2263
2264
2265 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2266     pa_operation *o;
2267     pa_tagstruct *t;
2268     uint32_t tag;
2269
2270     pa_assert(s);
2271     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2272
2273     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2274     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2275     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2276     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2277     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2278
2279     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2280     o->private = PA_UINT_TO_PTR(rate);
2281
2282     t = pa_tagstruct_command(
2283             s->context,
2284             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2285             &tag);
2286     pa_tagstruct_putu32(t, s->channel);
2287     pa_tagstruct_putu32(t, rate);
2288
2289     pa_pstream_send_tagstruct(s->context->pstream, t);
2290     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2291
2292     return o;
2293 }
2294
2295 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2296     pa_operation *o;
2297     pa_tagstruct *t;
2298     uint32_t tag;
2299
2300     pa_assert(s);
2301     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2302
2303     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2304     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2305     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2306     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2307
2308     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2309
2310     t = pa_tagstruct_command(
2311             s->context,
2312             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2313             &tag);
2314     pa_tagstruct_putu32(t, s->channel);
2315     pa_tagstruct_putu32(t, (uint32_t) mode);
2316     pa_tagstruct_put_proplist(t, p);
2317
2318     pa_pstream_send_tagstruct(s->context->pstream, t);
2319     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2320
2321     /* Please note that we don't update s->proplist here, because we
2322      * don't export that field */
2323
2324     return o;
2325 }
2326
2327 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2328     pa_operation *o;
2329     pa_tagstruct *t;
2330     uint32_t tag;
2331     const char * const*k;
2332
2333     pa_assert(s);
2334     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2335
2336     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2337     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2338     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2339     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2340
2341     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2342
2343     t = pa_tagstruct_command(
2344             s->context,
2345             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2346             &tag);
2347     pa_tagstruct_putu32(t, s->channel);
2348
2349     for (k = keys; *k; k++)
2350         pa_tagstruct_puts(t, *k);
2351
2352     pa_tagstruct_puts(t, NULL);
2353
2354     pa_pstream_send_tagstruct(s->context->pstream, t);
2355     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2356
2357     /* Please note that we don't update s->proplist here, because we
2358      * don't export that field */
2359
2360     return o;
2361 }
2362
2363 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2364     pa_assert(s);
2365     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2366
2367     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2368     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2369     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2370
2371     s->direct_on_input = sink_input_idx;
2372
2373     return 0;
2374 }
2375
2376 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2377     pa_assert(s);
2378     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2379
2380     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2381     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2382
2383     return s->direct_on_input;
2384 }