Merge most of elmarco/rtclock2
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57     s->read_callback = NULL;
58     s->read_userdata = NULL;
59     s->write_callback = NULL;
60     s->write_userdata = NULL;
61     s->state_callback = NULL;
62     s->state_userdata = NULL;
63     s->overflow_callback = NULL;
64     s->overflow_userdata = NULL;
65     s->underflow_callback = NULL;
66     s->underflow_userdata = NULL;
67     s->latency_update_callback = NULL;
68     s->latency_update_userdata = NULL;
69     s->moved_callback = NULL;
70     s->moved_userdata = NULL;
71     s->suspended_callback = NULL;
72     s->suspended_userdata = NULL;
73     s->started_callback = NULL;
74     s->started_userdata = NULL;
75     s->event_callback = NULL;
76     s->event_userdata = NULL;
77     s->buffer_attr_callback = NULL;
78     s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82         pa_context *c,
83         const char *name,
84         const pa_sample_spec *ss,
85         const pa_channel_map *map,
86         pa_proplist *p) {
87
88     pa_stream *s;
89     int i;
90     pa_channel_map tmap;
91
92     pa_assert(c);
93     pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     if (!map)
104         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114
115     s->sample_spec = *ss;
116     s->channel_map = *map;
117
118     s->direct_on_input = PA_INVALID_INDEX;
119
120     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121     if (name)
122         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124     s->channel = 0;
125     s->channel_valid = FALSE;
126     s->syncid = c->csyncid++;
127     s->stream_index = PA_INVALID_INDEX;
128
129     s->requested_bytes = 0;
130     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132     /* We initialize der target length here, so that if the user
133      * passes no explicit buffering metrics the default is similar to
134      * what older PA versions provided. */
135
136     s->buffer_attr.maxlength = (uint32_t) -1;
137     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138     s->buffer_attr.minreq = (uint32_t) -1;
139     s->buffer_attr.prebuf = (uint32_t) -1;
140     s->buffer_attr.fragsize = (uint32_t) -1;
141
142     s->device_index = PA_INVALID_INDEX;
143     s->device_name = NULL;
144     s->suspended = FALSE;
145     s->corked = FALSE;
146
147     pa_memchunk_reset(&s->peek_memchunk);
148     s->peek_data = NULL;
149
150     s->record_memblockq = NULL;
151
152
153     memset(&s->timing_info, 0, sizeof(s->timing_info));
154     s->timing_info_valid = FALSE;
155
156     s->previous_time = 0;
157
158     s->read_index_not_before = 0;
159     s->write_index_not_before = 0;
160     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
161         s->write_index_corrections[i].valid = 0;
162     s->current_write_index_correction = 0;
163
164     s->auto_timing_update_event = NULL;
165     s->auto_timing_update_requested = FALSE;
166     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
167
168     reset_callbacks(s);
169
170     s->smoother = NULL;
171
172     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
173     PA_LLIST_PREPEND(pa_stream, c->streams, s);
174     pa_stream_ref(s);
175
176     return s;
177 }
178
179 static void stream_unlink(pa_stream *s) {
180     pa_operation *o, *n;
181     pa_assert(s);
182
183     if (!s->context)
184         return;
185
186     /* Detach from context */
187
188     /* Unref all operatio object that point to us */
189     for (o = s->context->operations; o; o = n) {
190         n = o->next;
191
192         if (o->stream == s)
193             pa_operation_cancel(o);
194     }
195
196     /* Drop all outstanding replies for this stream */
197     if (s->context->pdispatch)
198         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
199
200     if (s->channel_valid) {
201         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
202         s->channel = 0;
203         s->channel_valid = FALSE;
204     }
205
206     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
207     pa_stream_unref(s);
208
209     s->context = NULL;
210
211     if (s->auto_timing_update_event) {
212         pa_assert(s->mainloop);
213         s->mainloop->time_free(s->auto_timing_update_event);
214     }
215
216     reset_callbacks(s);
217 }
218
219 static void stream_free(pa_stream *s) {
220     pa_assert(s);
221
222     stream_unlink(s);
223
224     if (s->peek_memchunk.memblock) {
225         if (s->peek_data)
226             pa_memblock_release(s->peek_memchunk.memblock);
227         pa_memblock_unref(s->peek_memchunk.memblock);
228     }
229
230     if (s->record_memblockq)
231         pa_memblockq_free(s->record_memblockq);
232
233     if (s->proplist)
234         pa_proplist_free(s->proplist);
235
236     if (s->smoother)
237         pa_smoother_free(s->smoother);
238
239     pa_xfree(s->device_name);
240     pa_xfree(s);
241 }
242
243 void pa_stream_unref(pa_stream *s) {
244     pa_assert(s);
245     pa_assert(PA_REFCNT_VALUE(s) >= 1);
246
247     if (PA_REFCNT_DEC(s) <= 0)
248         stream_free(s);
249 }
250
251 pa_stream* pa_stream_ref(pa_stream *s) {
252     pa_assert(s);
253     pa_assert(PA_REFCNT_VALUE(s) >= 1);
254
255     PA_REFCNT_INC(s);
256     return s;
257 }
258
259 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
260     pa_assert(s);
261     pa_assert(PA_REFCNT_VALUE(s) >= 1);
262
263     return s->state;
264 }
265
266 pa_context* pa_stream_get_context(pa_stream *s) {
267     pa_assert(s);
268     pa_assert(PA_REFCNT_VALUE(s) >= 1);
269
270     return s->context;
271 }
272
273 uint32_t pa_stream_get_index(pa_stream *s) {
274     pa_assert(s);
275     pa_assert(PA_REFCNT_VALUE(s) >= 1);
276
277     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
278     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
279
280     return s->stream_index;
281 }
282
283 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
284     pa_assert(s);
285     pa_assert(PA_REFCNT_VALUE(s) >= 1);
286
287     if (s->state == st)
288         return;
289
290     pa_stream_ref(s);
291
292     s->state = st;
293
294     if (s->state_callback)
295         s->state_callback(s, s->state_userdata);
296
297     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
298         stream_unlink(s);
299
300     pa_stream_unref(s);
301 }
302
303 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
304     pa_assert(s);
305     pa_assert(PA_REFCNT_VALUE(s) >= 1);
306
307     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
308         return;
309
310     if (s->state == PA_STREAM_READY &&
311         (force || !s->auto_timing_update_requested)) {
312         pa_operation *o;
313
314 /*         pa_log("Automatically requesting new timing data"); */
315
316         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
317             pa_operation_unref(o);
318             s->auto_timing_update_requested = TRUE;
319         }
320     }
321
322     if (s->auto_timing_update_event) {
323         if (force)
324             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
325
326         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
327
328         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
329     }
330 }
331
332 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
333     pa_context *c = userdata;
334     pa_stream *s;
335     uint32_t channel;
336
337     pa_assert(pd);
338     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
339     pa_assert(t);
340     pa_assert(c);
341     pa_assert(PA_REFCNT_VALUE(c) >= 1);
342
343     pa_context_ref(c);
344
345     if (pa_tagstruct_getu32(t, &channel) < 0 ||
346         !pa_tagstruct_eof(t)) {
347         pa_context_fail(c, PA_ERR_PROTOCOL);
348         goto finish;
349     }
350
351     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
352         goto finish;
353
354     if (s->state != PA_STREAM_READY)
355         goto finish;
356
357     pa_context_set_error(c, PA_ERR_KILLED);
358     pa_stream_set_state(s, PA_STREAM_FAILED);
359
360 finish:
361     pa_context_unref(c);
362 }
363
364 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
365     pa_usec_t x;
366
367     pa_assert(s);
368     pa_assert(!force_start || !force_stop);
369
370     if (!s->smoother)
371         return;
372
373     x = pa_rtclock_now();
374
375     if (s->timing_info_valid) {
376         if (aposteriori)
377             x -= s->timing_info.transport_usec;
378         else
379             x += s->timing_info.transport_usec;
380     }
381
382     if (s->suspended || s->corked || force_stop)
383         pa_smoother_pause(s->smoother, x);
384     else if (force_start || s->buffer_attr.prebuf == 0)
385         pa_smoother_resume(s->smoother, x, TRUE);
386
387
388     /* Please note that we have no idea if playback actually started
389      * if prebuf is non-zero! */
390 }
391
392 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
393     pa_context *c = userdata;
394     pa_stream *s;
395     uint32_t channel;
396     const char *dn;
397     pa_bool_t suspended;
398     uint32_t di;
399     pa_usec_t usec = 0;
400     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
401
402     pa_assert(pd);
403     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
404     pa_assert(t);
405     pa_assert(c);
406     pa_assert(PA_REFCNT_VALUE(c) >= 1);
407
408     pa_context_ref(c);
409
410     if (c->version < 12) {
411         pa_context_fail(c, PA_ERR_PROTOCOL);
412         goto finish;
413     }
414
415     if (pa_tagstruct_getu32(t, &channel) < 0 ||
416         pa_tagstruct_getu32(t, &di) < 0 ||
417         pa_tagstruct_gets(t, &dn) < 0 ||
418         pa_tagstruct_get_boolean(t, &suspended) < 0) {
419         pa_context_fail(c, PA_ERR_PROTOCOL);
420         goto finish;
421     }
422
423     if (c->version >= 13) {
424
425         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
426             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
427                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
428                 pa_tagstruct_get_usec(t, &usec) < 0) {
429                 pa_context_fail(c, PA_ERR_PROTOCOL);
430                 goto finish;
431             }
432         } else {
433             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
434                 pa_tagstruct_getu32(t, &tlength) < 0 ||
435                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
436                 pa_tagstruct_getu32(t, &minreq) < 0 ||
437                 pa_tagstruct_get_usec(t, &usec) < 0) {
438                 pa_context_fail(c, PA_ERR_PROTOCOL);
439                 goto finish;
440             }
441         }
442     }
443
444     if (!pa_tagstruct_eof(t)) {
445         pa_context_fail(c, PA_ERR_PROTOCOL);
446         goto finish;
447     }
448
449     if (!dn || di == PA_INVALID_INDEX) {
450         pa_context_fail(c, PA_ERR_PROTOCOL);
451         goto finish;
452     }
453
454     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
455         goto finish;
456
457     if (s->state != PA_STREAM_READY)
458         goto finish;
459
460     if (c->version >= 13) {
461         if (s->direction == PA_STREAM_RECORD)
462             s->timing_info.configured_source_usec = usec;
463         else
464             s->timing_info.configured_sink_usec = usec;
465
466         s->buffer_attr.maxlength = maxlength;
467         s->buffer_attr.fragsize = fragsize;
468         s->buffer_attr.tlength = tlength;
469         s->buffer_attr.prebuf = prebuf;
470         s->buffer_attr.minreq = minreq;
471     }
472
473     pa_xfree(s->device_name);
474     s->device_name = pa_xstrdup(dn);
475     s->device_index = di;
476
477     s->suspended = suspended;
478
479     check_smoother_status(s, TRUE, FALSE, FALSE);
480     request_auto_timing_update(s, TRUE);
481
482     if (s->moved_callback)
483         s->moved_callback(s, s->moved_userdata);
484
485 finish:
486     pa_context_unref(c);
487 }
488
489 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490     pa_context *c = userdata;
491     pa_stream *s;
492     uint32_t channel;
493     pa_usec_t usec = 0;
494     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
495
496     pa_assert(pd);
497     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
498     pa_assert(t);
499     pa_assert(c);
500     pa_assert(PA_REFCNT_VALUE(c) >= 1);
501
502     pa_context_ref(c);
503
504     if (c->version < 15) {
505         pa_context_fail(c, PA_ERR_PROTOCOL);
506         goto finish;
507     }
508
509     if (pa_tagstruct_getu32(t, &channel) < 0) {
510         pa_context_fail(c, PA_ERR_PROTOCOL);
511         goto finish;
512     }
513
514     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
515         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
516             pa_tagstruct_getu32(t, &fragsize) < 0 ||
517             pa_tagstruct_get_usec(t, &usec) < 0) {
518             pa_context_fail(c, PA_ERR_PROTOCOL);
519             goto finish;
520         }
521     } else {
522         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
523             pa_tagstruct_getu32(t, &tlength) < 0 ||
524             pa_tagstruct_getu32(t, &prebuf) < 0 ||
525             pa_tagstruct_getu32(t, &minreq) < 0 ||
526             pa_tagstruct_get_usec(t, &usec) < 0) {
527             pa_context_fail(c, PA_ERR_PROTOCOL);
528             goto finish;
529         }
530     }
531
532     if (!pa_tagstruct_eof(t)) {
533         pa_context_fail(c, PA_ERR_PROTOCOL);
534         goto finish;
535     }
536
537     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
538         goto finish;
539
540     if (s->state != PA_STREAM_READY)
541         goto finish;
542
543     if (s->direction == PA_STREAM_RECORD)
544         s->timing_info.configured_source_usec = usec;
545     else
546         s->timing_info.configured_sink_usec = usec;
547
548     s->buffer_attr.maxlength = maxlength;
549     s->buffer_attr.fragsize = fragsize;
550     s->buffer_attr.tlength = tlength;
551     s->buffer_attr.prebuf = prebuf;
552     s->buffer_attr.minreq = minreq;
553
554     request_auto_timing_update(s, TRUE);
555
556     if (s->buffer_attr_callback)
557         s->buffer_attr_callback(s, s->buffer_attr_userdata);
558
559 finish:
560     pa_context_unref(c);
561 }
562
563 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
564     pa_context *c = userdata;
565     pa_stream *s;
566     uint32_t channel;
567     pa_bool_t suspended;
568
569     pa_assert(pd);
570     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
571     pa_assert(t);
572     pa_assert(c);
573     pa_assert(PA_REFCNT_VALUE(c) >= 1);
574
575     pa_context_ref(c);
576
577     if (c->version < 12) {
578         pa_context_fail(c, PA_ERR_PROTOCOL);
579         goto finish;
580     }
581
582     if (pa_tagstruct_getu32(t, &channel) < 0 ||
583         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
584         !pa_tagstruct_eof(t)) {
585         pa_context_fail(c, PA_ERR_PROTOCOL);
586         goto finish;
587     }
588
589     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
590         goto finish;
591
592     if (s->state != PA_STREAM_READY)
593         goto finish;
594
595     s->suspended = suspended;
596
597     check_smoother_status(s, TRUE, FALSE, FALSE);
598     request_auto_timing_update(s, TRUE);
599
600     if (s->suspended_callback)
601         s->suspended_callback(s, s->suspended_userdata);
602
603 finish:
604     pa_context_unref(c);
605 }
606
607 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
608     pa_context *c = userdata;
609     pa_stream *s;
610     uint32_t channel;
611
612     pa_assert(pd);
613     pa_assert(command == PA_COMMAND_STARTED);
614     pa_assert(t);
615     pa_assert(c);
616     pa_assert(PA_REFCNT_VALUE(c) >= 1);
617
618     pa_context_ref(c);
619
620     if (c->version < 13) {
621         pa_context_fail(c, PA_ERR_PROTOCOL);
622         goto finish;
623     }
624
625     if (pa_tagstruct_getu32(t, &channel) < 0 ||
626         !pa_tagstruct_eof(t)) {
627         pa_context_fail(c, PA_ERR_PROTOCOL);
628         goto finish;
629     }
630
631     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
632         goto finish;
633
634     if (s->state != PA_STREAM_READY)
635         goto finish;
636
637     check_smoother_status(s, TRUE, TRUE, FALSE);
638     request_auto_timing_update(s, TRUE);
639
640     if (s->started_callback)
641         s->started_callback(s, s->started_userdata);
642
643 finish:
644     pa_context_unref(c);
645 }
646
647 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
648     pa_context *c = userdata;
649     pa_stream *s;
650     uint32_t channel;
651     pa_proplist *pl = NULL;
652     const char *event;
653
654     pa_assert(pd);
655     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
656     pa_assert(t);
657     pa_assert(c);
658     pa_assert(PA_REFCNT_VALUE(c) >= 1);
659
660     pa_context_ref(c);
661
662     if (c->version < 15) {
663         pa_context_fail(c, PA_ERR_PROTOCOL);
664         goto finish;
665     }
666
667     pl = pa_proplist_new();
668
669     if (pa_tagstruct_getu32(t, &channel) < 0 ||
670         pa_tagstruct_gets(t, &event) < 0 ||
671         pa_tagstruct_get_proplist(t, pl) < 0 ||
672         !pa_tagstruct_eof(t) || !event) {
673         pa_context_fail(c, PA_ERR_PROTOCOL);
674         goto finish;
675     }
676
677     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
678         goto finish;
679
680     if (s->state != PA_STREAM_READY)
681         goto finish;
682
683     if (s->event_callback)
684         s->event_callback(s, event, pl, s->event_userdata);
685
686 finish:
687     pa_context_unref(c);
688
689     if (pl)
690         pa_proplist_free(pl);
691 }
692
693 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
694     pa_stream *s;
695     pa_context *c = userdata;
696     uint32_t bytes, channel;
697
698     pa_assert(pd);
699     pa_assert(command == PA_COMMAND_REQUEST);
700     pa_assert(t);
701     pa_assert(c);
702     pa_assert(PA_REFCNT_VALUE(c) >= 1);
703
704     pa_context_ref(c);
705
706     if (pa_tagstruct_getu32(t, &channel) < 0 ||
707         pa_tagstruct_getu32(t, &bytes) < 0 ||
708         !pa_tagstruct_eof(t)) {
709         pa_context_fail(c, PA_ERR_PROTOCOL);
710         goto finish;
711     }
712
713     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
714         goto finish;
715
716     if (s->state != PA_STREAM_READY)
717         goto finish;
718
719     s->requested_bytes += bytes;
720
721     if (s->requested_bytes > 0 && s->write_callback)
722         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
723
724 finish:
725     pa_context_unref(c);
726 }
727
728 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
729     pa_stream *s;
730     pa_context *c = userdata;
731     uint32_t channel;
732
733     pa_assert(pd);
734     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
735     pa_assert(t);
736     pa_assert(c);
737     pa_assert(PA_REFCNT_VALUE(c) >= 1);
738
739     pa_context_ref(c);
740
741     if (pa_tagstruct_getu32(t, &channel) < 0 ||
742         !pa_tagstruct_eof(t)) {
743         pa_context_fail(c, PA_ERR_PROTOCOL);
744         goto finish;
745     }
746
747     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
748         goto finish;
749
750     if (s->state != PA_STREAM_READY)
751         goto finish;
752
753     if (s->buffer_attr.prebuf > 0)
754         check_smoother_status(s, TRUE, FALSE, TRUE);
755
756     request_auto_timing_update(s, TRUE);
757
758     if (command == PA_COMMAND_OVERFLOW) {
759         if (s->overflow_callback)
760             s->overflow_callback(s, s->overflow_userdata);
761     } else if (command == PA_COMMAND_UNDERFLOW) {
762         if (s->underflow_callback)
763             s->underflow_callback(s, s->underflow_userdata);
764     }
765
766  finish:
767     pa_context_unref(c);
768 }
769
770 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
771     pa_assert(s);
772     pa_assert(PA_REFCNT_VALUE(s) >= 1);
773
774 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
775
776     if (s->state != PA_STREAM_READY)
777         return;
778
779     if (w) {
780         s->write_index_not_before = s->context->ctag;
781
782         if (s->timing_info_valid)
783             s->timing_info.write_index_corrupt = TRUE;
784
785 /*         pa_log("write_index invalidated"); */
786     }
787
788     if (r) {
789         s->read_index_not_before = s->context->ctag;
790
791         if (s->timing_info_valid)
792             s->timing_info.read_index_corrupt = TRUE;
793
794 /*         pa_log("read_index invalidated"); */
795     }
796
797     request_auto_timing_update(s, TRUE);
798 }
799
800 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
801     pa_stream *s = userdata;
802
803     pa_assert(s);
804     pa_assert(PA_REFCNT_VALUE(s) >= 1);
805
806     pa_stream_ref(s);
807     request_auto_timing_update(s, FALSE);
808     pa_stream_unref(s);
809 }
810
811 static void create_stream_complete(pa_stream *s) {
812     pa_assert(s);
813     pa_assert(PA_REFCNT_VALUE(s) >= 1);
814     pa_assert(s->state == PA_STREAM_CREATING);
815
816     pa_stream_set_state(s, PA_STREAM_READY);
817
818     if (s->requested_bytes > 0 && s->write_callback)
819         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
820
821     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
822         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
823         pa_assert(!s->auto_timing_update_event);
824         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
825
826         request_auto_timing_update(s, TRUE);
827     }
828
829     check_smoother_status(s, TRUE, FALSE, FALSE);
830 }
831
832 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
833     pa_assert(s);
834     pa_assert(attr);
835     pa_assert(ss);
836
837     if (s->context->version >= 13)
838         return;
839
840     /* Version older than 0.9.10 didn't do server side buffer_attr
841      * selection, hence we have to fake it on the client side. */
842
843     /* We choose fairly conservative values here, to not confuse
844      * old clients with extremely large playback buffers */
845
846     if (attr->maxlength == (uint32_t) -1)
847         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
848
849     if (attr->tlength == (uint32_t) -1)
850         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
851
852     if (attr->minreq == (uint32_t) -1)
853         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
854
855     if (attr->prebuf == (uint32_t) -1)
856         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
857
858     if (attr->fragsize == (uint32_t) -1)
859         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
860 }
861
862 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
863     pa_stream *s = userdata;
864     uint32_t requested_bytes;
865
866     pa_assert(pd);
867     pa_assert(s);
868     pa_assert(PA_REFCNT_VALUE(s) >= 1);
869     pa_assert(s->state == PA_STREAM_CREATING);
870
871     pa_stream_ref(s);
872
873     if (command != PA_COMMAND_REPLY) {
874         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
875             goto finish;
876
877         pa_stream_set_state(s, PA_STREAM_FAILED);
878         goto finish;
879     }
880
881     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
882         s->channel == PA_INVALID_INDEX ||
883         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
884         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
885         pa_context_fail(s->context, PA_ERR_PROTOCOL);
886         goto finish;
887     }
888
889     s->requested_bytes = (int64_t) requested_bytes;
890
891     if (s->context->version >= 9) {
892         if (s->direction == PA_STREAM_PLAYBACK) {
893             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
894                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
895                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
896                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
897                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
898                 goto finish;
899             }
900         } else if (s->direction == PA_STREAM_RECORD) {
901             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
902                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
903                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
904                 goto finish;
905             }
906         }
907     }
908
909     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
910         pa_sample_spec ss;
911         pa_channel_map cm;
912         const char *dn = NULL;
913         pa_bool_t suspended;
914
915         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
916             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
917             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
918             pa_tagstruct_gets(t, &dn) < 0 ||
919             pa_tagstruct_get_boolean(t, &suspended) < 0) {
920             pa_context_fail(s->context, PA_ERR_PROTOCOL);
921             goto finish;
922         }
923
924         if (!dn || s->device_index == PA_INVALID_INDEX ||
925             ss.channels != cm.channels ||
926             !pa_channel_map_valid(&cm) ||
927             !pa_sample_spec_valid(&ss) ||
928             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
929             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
930             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
931             pa_context_fail(s->context, PA_ERR_PROTOCOL);
932             goto finish;
933         }
934
935         pa_xfree(s->device_name);
936         s->device_name = pa_xstrdup(dn);
937         s->suspended = suspended;
938
939         s->channel_map = cm;
940         s->sample_spec = ss;
941     }
942
943     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
944         pa_usec_t usec;
945
946         if (pa_tagstruct_get_usec(t, &usec) < 0) {
947             pa_context_fail(s->context, PA_ERR_PROTOCOL);
948             goto finish;
949         }
950
951         if (s->direction == PA_STREAM_RECORD)
952             s->timing_info.configured_source_usec = usec;
953         else
954             s->timing_info.configured_sink_usec = usec;
955     }
956
957     if (!pa_tagstruct_eof(t)) {
958         pa_context_fail(s->context, PA_ERR_PROTOCOL);
959         goto finish;
960     }
961
962     if (s->direction == PA_STREAM_RECORD) {
963         pa_assert(!s->record_memblockq);
964
965         s->record_memblockq = pa_memblockq_new(
966                 0,
967                 s->buffer_attr.maxlength,
968                 0,
969                 pa_frame_size(&s->sample_spec),
970                 1,
971                 0,
972                 0,
973                 NULL);
974     }
975
976     s->channel_valid = TRUE;
977     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
978
979     create_stream_complete(s);
980
981 finish:
982     pa_stream_unref(s);
983 }
984
985 static int create_stream(
986         pa_stream_direction_t direction,
987         pa_stream *s,
988         const char *dev,
989         const pa_buffer_attr *attr,
990         pa_stream_flags_t flags,
991         const pa_cvolume *volume,
992         pa_stream *sync_stream) {
993
994     pa_tagstruct *t;
995     uint32_t tag;
996     pa_bool_t volume_set = FALSE;
997
998     pa_assert(s);
999     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1000     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1001
1002     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1003     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1004     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1005     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1006                                               PA_STREAM_INTERPOLATE_TIMING|
1007                                               PA_STREAM_NOT_MONOTONIC|
1008                                               PA_STREAM_AUTO_TIMING_UPDATE|
1009                                               PA_STREAM_NO_REMAP_CHANNELS|
1010                                               PA_STREAM_NO_REMIX_CHANNELS|
1011                                               PA_STREAM_FIX_FORMAT|
1012                                               PA_STREAM_FIX_RATE|
1013                                               PA_STREAM_FIX_CHANNELS|
1014                                               PA_STREAM_DONT_MOVE|
1015                                               PA_STREAM_VARIABLE_RATE|
1016                                               PA_STREAM_PEAK_DETECT|
1017                                               PA_STREAM_START_MUTED|
1018                                               PA_STREAM_ADJUST_LATENCY|
1019                                               PA_STREAM_EARLY_REQUESTS|
1020                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1021                                               PA_STREAM_START_UNMUTED|
1022                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1023
1024     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1025     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1026     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1027     /* Althought some of the other flags are not supported on older
1028      * version, we don't check for them here, because it doesn't hurt
1029      * when they are passed but actually not supported. This makes
1030      * client development easier */
1031
1032     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1033     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1034     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1035     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1036     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1037
1038     pa_stream_ref(s);
1039
1040     s->direction = direction;
1041     s->flags = flags;
1042     s->corked = !!(flags & PA_STREAM_START_CORKED);
1043
1044     if (sync_stream)
1045         s->syncid = sync_stream->syncid;
1046
1047     if (attr)
1048         s->buffer_attr = *attr;
1049     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1050
1051     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1052         pa_usec_t x;
1053
1054         x = pa_rtclock_now();
1055
1056         pa_assert(!s->smoother);
1057         s->smoother = pa_smoother_new(
1058                 SMOOTHER_ADJUST_TIME,
1059                 SMOOTHER_HISTORY_TIME,
1060                 !(flags & PA_STREAM_NOT_MONOTONIC),
1061                 TRUE,
1062                 SMOOTHER_MIN_HISTORY,
1063                 x,
1064                 TRUE);
1065     }
1066
1067     if (!dev)
1068         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1069
1070     t = pa_tagstruct_command(
1071             s->context,
1072             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1073             &tag);
1074
1075     if (s->context->version < 13)
1076         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1077
1078     pa_tagstruct_put(
1079             t,
1080             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1081             PA_TAG_CHANNEL_MAP, &s->channel_map,
1082             PA_TAG_U32, PA_INVALID_INDEX,
1083             PA_TAG_STRING, dev,
1084             PA_TAG_U32, s->buffer_attr.maxlength,
1085             PA_TAG_BOOLEAN, s->corked,
1086             PA_TAG_INVALID);
1087
1088     if (s->direction == PA_STREAM_PLAYBACK) {
1089         pa_cvolume cv;
1090
1091         pa_tagstruct_put(
1092                 t,
1093                 PA_TAG_U32, s->buffer_attr.tlength,
1094                 PA_TAG_U32, s->buffer_attr.prebuf,
1095                 PA_TAG_U32, s->buffer_attr.minreq,
1096                 PA_TAG_U32, s->syncid,
1097                 PA_TAG_INVALID);
1098
1099         volume_set = !!volume;
1100
1101         if (!volume)
1102             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1103
1104         pa_tagstruct_put_cvolume(t, volume);
1105     } else
1106         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1107
1108     if (s->context->version >= 12) {
1109         pa_tagstruct_put(
1110                 t,
1111                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1112                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1113                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1114                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1115                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1116                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1117                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1118                 PA_TAG_INVALID);
1119     }
1120
1121     if (s->context->version >= 13) {
1122
1123         if (s->direction == PA_STREAM_PLAYBACK)
1124             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1125         else
1126             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1127
1128         pa_tagstruct_put(
1129                 t,
1130                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1131                 PA_TAG_PROPLIST, s->proplist,
1132                 PA_TAG_INVALID);
1133
1134         if (s->direction == PA_STREAM_RECORD)
1135             pa_tagstruct_putu32(t, s->direct_on_input);
1136     }
1137
1138     if (s->context->version >= 14) {
1139
1140         if (s->direction == PA_STREAM_PLAYBACK)
1141             pa_tagstruct_put_boolean(t, volume_set);
1142
1143         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1144     }
1145
1146     if (s->context->version >= 15) {
1147
1148         if (s->direction == PA_STREAM_PLAYBACK)
1149             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1150
1151         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1152         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1153     }
1154
1155     pa_pstream_send_tagstruct(s->context->pstream, t);
1156     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1157
1158     pa_stream_set_state(s, PA_STREAM_CREATING);
1159
1160     pa_stream_unref(s);
1161     return 0;
1162 }
1163
1164 int pa_stream_connect_playback(
1165         pa_stream *s,
1166         const char *dev,
1167         const pa_buffer_attr *attr,
1168         pa_stream_flags_t flags,
1169         pa_cvolume *volume,
1170         pa_stream *sync_stream) {
1171
1172     pa_assert(s);
1173     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1174
1175     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1176 }
1177
1178 int pa_stream_connect_record(
1179         pa_stream *s,
1180         const char *dev,
1181         const pa_buffer_attr *attr,
1182         pa_stream_flags_t flags) {
1183
1184     pa_assert(s);
1185     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1186
1187     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1188 }
1189
1190 int pa_stream_write(
1191         pa_stream *s,
1192         const void *data,
1193         size_t length,
1194         void (*free_cb)(void *p),
1195         int64_t offset,
1196         pa_seek_mode_t seek) {
1197
1198     pa_memchunk chunk;
1199     pa_seek_mode_t t_seek;
1200     int64_t t_offset;
1201     size_t t_length;
1202     const void *t_data;
1203
1204     pa_assert(s);
1205     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1206     pa_assert(data);
1207
1208     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1209     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1210     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1211     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1212     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1213
1214     if (length <= 0)
1215         return 0;
1216
1217     t_seek = seek;
1218     t_offset = offset;
1219     t_length = length;
1220     t_data = data;
1221
1222     while (t_length > 0) {
1223
1224         chunk.index = 0;
1225
1226         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1227             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1228             chunk.length = t_length;
1229         } else {
1230             void *d;
1231
1232             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1233             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1234
1235             d = pa_memblock_acquire(chunk.memblock);
1236             memcpy(d, t_data, chunk.length);
1237             pa_memblock_release(chunk.memblock);
1238         }
1239
1240         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1241
1242         t_offset = 0;
1243         t_seek = PA_SEEK_RELATIVE;
1244
1245         t_data = (const uint8_t*) t_data + chunk.length;
1246         t_length -= chunk.length;
1247
1248         pa_memblock_unref(chunk.memblock);
1249     }
1250
1251     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1252         free_cb((void*) data);
1253
1254     /* This is obviously wrong since we ignore the seeking index . But
1255      * that's OK, the server side applies the same error */
1256     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1257
1258     if (s->direction == PA_STREAM_PLAYBACK) {
1259
1260         /* Update latency request correction */
1261         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1262
1263             if (seek == PA_SEEK_ABSOLUTE) {
1264                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1265                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1266                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1267             } else if (seek == PA_SEEK_RELATIVE) {
1268                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1269                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1270             } else
1271                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1272         }
1273
1274         /* Update the write index in the already available latency data */
1275         if (s->timing_info_valid) {
1276
1277             if (seek == PA_SEEK_ABSOLUTE) {
1278                 s->timing_info.write_index_corrupt = FALSE;
1279                 s->timing_info.write_index = offset + (int64_t) length;
1280             } else if (seek == PA_SEEK_RELATIVE) {
1281                 if (!s->timing_info.write_index_corrupt)
1282                     s->timing_info.write_index += offset + (int64_t) length;
1283             } else
1284                 s->timing_info.write_index_corrupt = TRUE;
1285         }
1286
1287         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1288             request_auto_timing_update(s, TRUE);
1289     }
1290
1291     return 0;
1292 }
1293
1294 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1295     pa_assert(s);
1296     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1297     pa_assert(data);
1298     pa_assert(length);
1299
1300     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1301     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1302     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1303
1304     if (!s->peek_memchunk.memblock) {
1305
1306         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1307             *data = NULL;
1308             *length = 0;
1309             return 0;
1310         }
1311
1312         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1313     }
1314
1315     pa_assert(s->peek_data);
1316     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1317     *length = s->peek_memchunk.length;
1318     return 0;
1319 }
1320
1321 int pa_stream_drop(pa_stream *s) {
1322     pa_assert(s);
1323     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1324
1325     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1326     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1327     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1328     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1329
1330     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1331
1332     /* Fix the simulated local read index */
1333     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1334         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1335
1336     pa_assert(s->peek_data);
1337     pa_memblock_release(s->peek_memchunk.memblock);
1338     pa_memblock_unref(s->peek_memchunk.memblock);
1339     pa_memchunk_reset(&s->peek_memchunk);
1340
1341     return 0;
1342 }
1343
1344 size_t pa_stream_writable_size(pa_stream *s) {
1345     pa_assert(s);
1346     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1347
1348     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1349     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1350     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1351
1352     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1353 }
1354
1355 size_t pa_stream_readable_size(pa_stream *s) {
1356     pa_assert(s);
1357     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1358
1359     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1360     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1361     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1362
1363     return pa_memblockq_get_length(s->record_memblockq);
1364 }
1365
1366 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1367     pa_operation *o;
1368     pa_tagstruct *t;
1369     uint32_t tag;
1370
1371     pa_assert(s);
1372     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1373
1374     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1375     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1376     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1377
1378     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1379
1380     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1381     pa_tagstruct_putu32(t, s->channel);
1382     pa_pstream_send_tagstruct(s->context->pstream, t);
1383     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1384
1385     return o;
1386 }
1387
1388 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1389     pa_usec_t usec;
1390
1391     pa_assert(s);
1392     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1393     pa_assert(s->state == PA_STREAM_READY);
1394     pa_assert(s->direction != PA_STREAM_UPLOAD);
1395     pa_assert(s->timing_info_valid);
1396     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1397     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1398
1399     if (s->direction == PA_STREAM_PLAYBACK) {
1400         /* The last byte that was written into the output device
1401          * had this time value associated */
1402         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1403
1404         if (!s->corked && !s->suspended) {
1405
1406             if (!ignore_transport)
1407                 /* Because the latency info took a little time to come
1408                  * to us, we assume that the real output time is actually
1409                  * a little ahead */
1410                 usec += s->timing_info.transport_usec;
1411
1412             /* However, the output device usually maintains a buffer
1413                too, hence the real sample currently played is a little
1414                back  */
1415             if (s->timing_info.sink_usec >= usec)
1416                 usec = 0;
1417             else
1418                 usec -= s->timing_info.sink_usec;
1419         }
1420
1421     } else {
1422         pa_assert(s->direction == PA_STREAM_RECORD);
1423
1424         /* The last byte written into the server side queue had
1425          * this time value associated */
1426         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1427
1428         if (!s->corked && !s->suspended) {
1429
1430             if (!ignore_transport)
1431                 /* Add transport latency */
1432                 usec += s->timing_info.transport_usec;
1433
1434             /* Add latency of data in device buffer */
1435             usec += s->timing_info.source_usec;
1436
1437             /* If this is a monitor source, we need to correct the
1438              * time by the playback device buffer */
1439             if (s->timing_info.sink_usec >= usec)
1440                 usec = 0;
1441             else
1442                 usec -= s->timing_info.sink_usec;
1443         }
1444     }
1445
1446     return usec;
1447 }
1448
1449 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1450     pa_operation *o = userdata;
1451     struct timeval local, remote, now;
1452     pa_timing_info *i;
1453     pa_bool_t playing = FALSE;
1454     uint64_t underrun_for = 0, playing_for = 0;
1455
1456     pa_assert(pd);
1457     pa_assert(o);
1458     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1459
1460     if (!o->context || !o->stream)
1461         goto finish;
1462
1463     i = &o->stream->timing_info;
1464
1465     o->stream->timing_info_valid = FALSE;
1466     i->write_index_corrupt = TRUE;
1467     i->read_index_corrupt = TRUE;
1468
1469     if (command != PA_COMMAND_REPLY) {
1470         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1471             goto finish;
1472
1473     } else {
1474
1475         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1476             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1477             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1478             pa_tagstruct_get_timeval(t, &local) < 0 ||
1479             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1480             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1481             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1482
1483             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1484             goto finish;
1485         }
1486
1487         if (o->context->version >= 13 &&
1488             o->stream->direction == PA_STREAM_PLAYBACK)
1489             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1490                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1491
1492                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1493                 goto finish;
1494             }
1495
1496
1497         if (!pa_tagstruct_eof(t)) {
1498             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1499             goto finish;
1500         }
1501         o->stream->timing_info_valid = TRUE;
1502         i->write_index_corrupt = FALSE;
1503         i->read_index_corrupt = FALSE;
1504
1505         i->playing = (int) playing;
1506         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1507
1508         pa_gettimeofday(&now);
1509
1510         /* Calculcate timestamps */
1511         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1512             /* local and remote seem to have synchronized clocks */
1513
1514             if (o->stream->direction == PA_STREAM_PLAYBACK)
1515                 i->transport_usec = pa_timeval_diff(&remote, &local);
1516             else
1517                 i->transport_usec = pa_timeval_diff(&now, &remote);
1518
1519             i->synchronized_clocks = TRUE;
1520             i->timestamp = remote;
1521         } else {
1522             /* clocks are not synchronized, let's estimate latency then */
1523             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1524             i->synchronized_clocks = FALSE;
1525             i->timestamp = local;
1526             pa_timeval_add(&i->timestamp, i->transport_usec);
1527         }
1528
1529         /* Invalidate read and write indexes if necessary */
1530         if (tag < o->stream->read_index_not_before)
1531             i->read_index_corrupt = TRUE;
1532
1533         if (tag < o->stream->write_index_not_before)
1534             i->write_index_corrupt = TRUE;
1535
1536         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1537             /* Write index correction */
1538
1539             int n, j;
1540             uint32_t ctag = tag;
1541
1542             /* Go through the saved correction values and add up the
1543              * total correction.*/
1544             for (n = 0, j = o->stream->current_write_index_correction+1;
1545                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1546                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1547
1548                 /* Step over invalid data or out-of-date data */
1549                 if (!o->stream->write_index_corrections[j].valid ||
1550                     o->stream->write_index_corrections[j].tag < ctag)
1551                     continue;
1552
1553                 /* Make sure that everything is in order */
1554                 ctag = o->stream->write_index_corrections[j].tag+1;
1555
1556                 /* Now fix the write index */
1557                 if (o->stream->write_index_corrections[j].corrupt) {
1558                     /* A corrupting seek was made */
1559                     i->write_index_corrupt = TRUE;
1560                 } else if (o->stream->write_index_corrections[j].absolute) {
1561                     /* An absolute seek was made */
1562                     i->write_index = o->stream->write_index_corrections[j].value;
1563                     i->write_index_corrupt = FALSE;
1564                 } else if (!i->write_index_corrupt) {
1565                     /* A relative seek was made */
1566                     i->write_index += o->stream->write_index_corrections[j].value;
1567                 }
1568             }
1569
1570             /* Clear old correction entries */
1571             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1572                 if (!o->stream->write_index_corrections[n].valid)
1573                     continue;
1574
1575                 if (o->stream->write_index_corrections[n].tag <= tag)
1576                     o->stream->write_index_corrections[n].valid = FALSE;
1577             }
1578         }
1579
1580         if (o->stream->direction == PA_STREAM_RECORD) {
1581             /* Read index correction */
1582
1583             if (!i->read_index_corrupt)
1584                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1585         }
1586
1587         /* Update smoother */
1588         if (o->stream->smoother) {
1589             pa_usec_t u, x;
1590
1591             u = x = pa_rtclock_now() - i->transport_usec;
1592
1593             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1594                 pa_usec_t su;
1595
1596                 /* If we weren't playing then it will take some time
1597                  * until the audio will actually come out through the
1598                  * speakers. Since we follow that timing here, we need
1599                  * to try to fix this up */
1600
1601                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1602
1603                 if (su < i->sink_usec)
1604                     x += i->sink_usec - su;
1605             }
1606
1607             if (!i->playing)
1608                 pa_smoother_pause(o->stream->smoother, x);
1609
1610             /* Update the smoother */
1611             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1612                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1613                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1614
1615             if (i->playing)
1616                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1617         }
1618     }
1619
1620     o->stream->auto_timing_update_requested = FALSE;
1621
1622     if (o->stream->latency_update_callback)
1623         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1624
1625     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1626         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1627         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1628     }
1629
1630 finish:
1631
1632     pa_operation_done(o);
1633     pa_operation_unref(o);
1634 }
1635
1636 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1637     uint32_t tag;
1638     pa_operation *o;
1639     pa_tagstruct *t;
1640     struct timeval now;
1641     int cidx = 0;
1642
1643     pa_assert(s);
1644     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1645
1646     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1647     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1648     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1649
1650     if (s->direction == PA_STREAM_PLAYBACK) {
1651         /* Find a place to store the write_index correction data for this entry */
1652         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1653
1654         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1655         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1656     }
1657     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1658
1659     t = pa_tagstruct_command(
1660             s->context,
1661             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1662             &tag);
1663     pa_tagstruct_putu32(t, s->channel);
1664     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1665
1666     pa_pstream_send_tagstruct(s->context->pstream, t);
1667     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1668
1669     if (s->direction == PA_STREAM_PLAYBACK) {
1670         /* Fill in initial correction data */
1671
1672         s->current_write_index_correction = cidx;
1673
1674         s->write_index_corrections[cidx].valid = TRUE;
1675         s->write_index_corrections[cidx].absolute = FALSE;
1676         s->write_index_corrections[cidx].corrupt = FALSE;
1677         s->write_index_corrections[cidx].tag = tag;
1678         s->write_index_corrections[cidx].value = 0;
1679     }
1680
1681     return o;
1682 }
1683
1684 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1685     pa_stream *s = userdata;
1686
1687     pa_assert(pd);
1688     pa_assert(s);
1689     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1690
1691     pa_stream_ref(s);
1692
1693     if (command != PA_COMMAND_REPLY) {
1694         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1695             goto finish;
1696
1697         pa_stream_set_state(s, PA_STREAM_FAILED);
1698         goto finish;
1699     } else if (!pa_tagstruct_eof(t)) {
1700         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1701         goto finish;
1702     }
1703
1704     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1705
1706 finish:
1707     pa_stream_unref(s);
1708 }
1709
1710 int pa_stream_disconnect(pa_stream *s) {
1711     pa_tagstruct *t;
1712     uint32_t tag;
1713
1714     pa_assert(s);
1715     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1716
1717     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1718     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1719     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1720
1721     pa_stream_ref(s);
1722
1723     t = pa_tagstruct_command(
1724             s->context,
1725             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1726                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1727             &tag);
1728     pa_tagstruct_putu32(t, s->channel);
1729     pa_pstream_send_tagstruct(s->context->pstream, t);
1730     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1731
1732     pa_stream_unref(s);
1733     return 0;
1734 }
1735
1736 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1737     pa_assert(s);
1738     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1739
1740     if (pa_detect_fork())
1741         return;
1742
1743     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1744         return;
1745
1746     s->read_callback = cb;
1747     s->read_userdata = userdata;
1748 }
1749
1750 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1751     pa_assert(s);
1752     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1753
1754     if (pa_detect_fork())
1755         return;
1756
1757     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1758         return;
1759
1760     s->write_callback = cb;
1761     s->write_userdata = userdata;
1762 }
1763
1764 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1765     pa_assert(s);
1766     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1767
1768     if (pa_detect_fork())
1769         return;
1770
1771     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1772         return;
1773
1774     s->state_callback = cb;
1775     s->state_userdata = userdata;
1776 }
1777
1778 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1779     pa_assert(s);
1780     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1781
1782     if (pa_detect_fork())
1783         return;
1784
1785     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1786         return;
1787
1788     s->overflow_callback = cb;
1789     s->overflow_userdata = userdata;
1790 }
1791
1792 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1793     pa_assert(s);
1794     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1795
1796     if (pa_detect_fork())
1797         return;
1798
1799     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1800         return;
1801
1802     s->underflow_callback = cb;
1803     s->underflow_userdata = userdata;
1804 }
1805
1806 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1807     pa_assert(s);
1808     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1809
1810     if (pa_detect_fork())
1811         return;
1812
1813     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1814         return;
1815
1816     s->latency_update_callback = cb;
1817     s->latency_update_userdata = userdata;
1818 }
1819
1820 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1821     pa_assert(s);
1822     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1823
1824     if (pa_detect_fork())
1825         return;
1826
1827     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1828         return;
1829
1830     s->moved_callback = cb;
1831     s->moved_userdata = userdata;
1832 }
1833
1834 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1835     pa_assert(s);
1836     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1837
1838     if (pa_detect_fork())
1839         return;
1840
1841     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1842         return;
1843
1844     s->suspended_callback = cb;
1845     s->suspended_userdata = userdata;
1846 }
1847
1848 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1849     pa_assert(s);
1850     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1851
1852     if (pa_detect_fork())
1853         return;
1854
1855     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1856         return;
1857
1858     s->started_callback = cb;
1859     s->started_userdata = userdata;
1860 }
1861
1862 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1863     pa_assert(s);
1864     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1865
1866     if (pa_detect_fork())
1867         return;
1868
1869     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1870         return;
1871
1872     s->event_callback = cb;
1873     s->event_userdata = userdata;
1874 }
1875
1876 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1877     pa_assert(s);
1878     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1879
1880     if (pa_detect_fork())
1881         return;
1882
1883     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1884         return;
1885
1886     s->buffer_attr_callback = cb;
1887     s->buffer_attr_userdata = userdata;
1888 }
1889
1890 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1891     pa_operation *o = userdata;
1892     int success = 1;
1893
1894     pa_assert(pd);
1895     pa_assert(o);
1896     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1897
1898     if (!o->context)
1899         goto finish;
1900
1901     if (command != PA_COMMAND_REPLY) {
1902         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1903             goto finish;
1904
1905         success = 0;
1906     } else if (!pa_tagstruct_eof(t)) {
1907         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1908         goto finish;
1909     }
1910
1911     if (o->callback) {
1912         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1913         cb(o->stream, success, o->userdata);
1914     }
1915
1916 finish:
1917     pa_operation_done(o);
1918     pa_operation_unref(o);
1919 }
1920
1921 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1922     pa_operation *o;
1923     pa_tagstruct *t;
1924     uint32_t tag;
1925
1926     pa_assert(s);
1927     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1928
1929     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1930     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1931     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1932
1933     s->corked = b;
1934
1935     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1936
1937     t = pa_tagstruct_command(
1938             s->context,
1939             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1940             &tag);
1941     pa_tagstruct_putu32(t, s->channel);
1942     pa_tagstruct_put_boolean(t, !!b);
1943     pa_pstream_send_tagstruct(s->context->pstream, t);
1944     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1945
1946     check_smoother_status(s, FALSE, FALSE, FALSE);
1947
1948     /* This might cause the indexes to hang/start again, hence
1949      * let's request a timing update */
1950     request_auto_timing_update(s, TRUE);
1951
1952     return o;
1953 }
1954
1955 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1956     pa_tagstruct *t;
1957     pa_operation *o;
1958     uint32_t tag;
1959
1960     pa_assert(s);
1961     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1962
1963     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1964     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1965
1966     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1967
1968     t = pa_tagstruct_command(s->context, command, &tag);
1969     pa_tagstruct_putu32(t, s->channel);
1970     pa_pstream_send_tagstruct(s->context->pstream, t);
1971     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1972
1973     return o;
1974 }
1975
1976 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1977     pa_operation *o;
1978
1979     pa_assert(s);
1980     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1981
1982     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1983     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1984     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1985
1986     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1987         return NULL;
1988
1989     if (s->direction == PA_STREAM_PLAYBACK) {
1990
1991         if (s->write_index_corrections[s->current_write_index_correction].valid)
1992             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1993
1994         if (s->buffer_attr.prebuf > 0)
1995             check_smoother_status(s, FALSE, FALSE, TRUE);
1996
1997         /* This will change the write index, but leave the
1998          * read index untouched. */
1999         invalidate_indexes(s, FALSE, TRUE);
2000
2001     } else
2002         /* For record streams this has no influence on the write
2003          * index, but the read index might jump. */
2004         invalidate_indexes(s, TRUE, FALSE);
2005
2006     return o;
2007 }
2008
2009 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2010     pa_operation *o;
2011
2012     pa_assert(s);
2013     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2014
2015     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2016     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2017     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2018     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2019
2020     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2021         return NULL;
2022
2023     /* This might cause the read index to hang again, hence
2024      * let's request a timing update */
2025     request_auto_timing_update(s, TRUE);
2026
2027     return o;
2028 }
2029
2030 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2031     pa_operation *o;
2032
2033     pa_assert(s);
2034     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2035
2036     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2037     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2038     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2039     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2040
2041     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2042         return NULL;
2043
2044     /* This might cause the read index to start moving again, hence
2045      * let's request a timing update */
2046     request_auto_timing_update(s, TRUE);
2047
2048     return o;
2049 }
2050
2051 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2052     pa_operation *o;
2053
2054     pa_assert(s);
2055     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2056     pa_assert(name);
2057
2058     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2059     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2060     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2061
2062     if (s->context->version >= 13) {
2063         pa_proplist *p = pa_proplist_new();
2064
2065         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2066         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2067         pa_proplist_free(p);
2068     } else {
2069         pa_tagstruct *t;
2070         uint32_t tag;
2071
2072         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2073         t = pa_tagstruct_command(
2074                 s->context,
2075                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2076                 &tag);
2077         pa_tagstruct_putu32(t, s->channel);
2078         pa_tagstruct_puts(t, name);
2079         pa_pstream_send_tagstruct(s->context->pstream, t);
2080         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2081     }
2082
2083     return o;
2084 }
2085
2086 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2087     pa_usec_t usec;
2088
2089     pa_assert(s);
2090     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2091
2092     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2093     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2094     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2095     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2096     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2097     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2098
2099     if (s->smoother)
2100         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2101     else
2102         usec = calc_time(s, FALSE);
2103
2104     /* Make sure the time runs monotonically */
2105     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2106         if (usec < s->previous_time)
2107             usec = s->previous_time;
2108         else
2109             s->previous_time = usec;
2110     }
2111
2112     if (r_usec)
2113         *r_usec = usec;
2114
2115     return 0;
2116 }
2117
2118 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2119     pa_assert(s);
2120     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2121
2122     if (negative)
2123         *negative = 0;
2124
2125     if (a >= b)
2126         return a-b;
2127     else {
2128         if (negative && s->direction == PA_STREAM_RECORD) {
2129             *negative = 1;
2130             return b-a;
2131         } else
2132             return 0;
2133     }
2134 }
2135
2136 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2137     pa_usec_t t, c;
2138     int r;
2139     int64_t cindex;
2140
2141     pa_assert(s);
2142     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2143     pa_assert(r_usec);
2144
2145     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2146     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2147     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2148     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2149     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2150     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2151
2152     if ((r = pa_stream_get_time(s, &t)) < 0)
2153         return r;
2154
2155     if (s->direction == PA_STREAM_PLAYBACK)
2156         cindex = s->timing_info.write_index;
2157     else
2158         cindex = s->timing_info.read_index;
2159
2160     if (cindex < 0)
2161         cindex = 0;
2162
2163     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2164
2165     if (s->direction == PA_STREAM_PLAYBACK)
2166         *r_usec = time_counter_diff(s, c, t, negative);
2167     else
2168         *r_usec = time_counter_diff(s, t, c, negative);
2169
2170     return 0;
2171 }
2172
2173 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2174     pa_assert(s);
2175     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2176
2177     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2178     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2179     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2180     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2181
2182     return &s->timing_info;
2183 }
2184
2185 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2186     pa_assert(s);
2187     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2188
2189     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2190
2191     return &s->sample_spec;
2192 }
2193
2194 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2195     pa_assert(s);
2196     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2197
2198     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2199
2200     return &s->channel_map;
2201 }
2202
2203 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2204     pa_assert(s);
2205     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2206
2207     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2208     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2209     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2210
2211     return &s->buffer_attr;
2212 }
2213
2214 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2215     pa_operation *o = userdata;
2216     int success = 1;
2217
2218     pa_assert(pd);
2219     pa_assert(o);
2220     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2221
2222     if (!o->context)
2223         goto finish;
2224
2225     if (command != PA_COMMAND_REPLY) {
2226         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2227             goto finish;
2228
2229         success = 0;
2230     } else {
2231         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2232             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2233                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2234                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2235                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2236                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2237                 goto finish;
2238             }
2239         } else if (o->stream->direction == PA_STREAM_RECORD) {
2240             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2241                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2242                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2243                 goto finish;
2244             }
2245         }
2246
2247         if (o->stream->context->version >= 13) {
2248             pa_usec_t usec;
2249
2250             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2251                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2252                 goto finish;
2253             }
2254
2255             if (o->stream->direction == PA_STREAM_RECORD)
2256                 o->stream->timing_info.configured_source_usec = usec;
2257             else
2258                 o->stream->timing_info.configured_sink_usec = usec;
2259         }
2260
2261         if (!pa_tagstruct_eof(t)) {
2262             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2263             goto finish;
2264         }
2265     }
2266
2267     if (o->callback) {
2268         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2269         cb(o->stream, success, o->userdata);
2270     }
2271
2272 finish:
2273     pa_operation_done(o);
2274     pa_operation_unref(o);
2275 }
2276
2277
2278 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2279     pa_operation *o;
2280     pa_tagstruct *t;
2281     uint32_t tag;
2282
2283     pa_assert(s);
2284     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2285     pa_assert(attr);
2286
2287     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2288     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2289     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2290     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2291
2292     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2293
2294     t = pa_tagstruct_command(
2295             s->context,
2296             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2297             &tag);
2298     pa_tagstruct_putu32(t, s->channel);
2299
2300     pa_tagstruct_putu32(t, attr->maxlength);
2301
2302     if (s->direction == PA_STREAM_PLAYBACK)
2303         pa_tagstruct_put(
2304                 t,
2305                 PA_TAG_U32, attr->tlength,
2306                 PA_TAG_U32, attr->prebuf,
2307                 PA_TAG_U32, attr->minreq,
2308                 PA_TAG_INVALID);
2309     else
2310         pa_tagstruct_putu32(t, attr->fragsize);
2311
2312     if (s->context->version >= 13)
2313         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2314
2315     if (s->context->version >= 14)
2316         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2317
2318     pa_pstream_send_tagstruct(s->context->pstream, t);
2319     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2320
2321     /* This might cause changes in the read/write indexex, hence let's
2322      * request a timing update */
2323     request_auto_timing_update(s, TRUE);
2324
2325     return o;
2326 }
2327
2328 uint32_t pa_stream_get_device_index(pa_stream *s) {
2329     pa_assert(s);
2330     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2331
2332     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2333     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2334     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2335     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2336     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2337
2338     return s->device_index;
2339 }
2340
2341 const char *pa_stream_get_device_name(pa_stream *s) {
2342     pa_assert(s);
2343     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2344
2345     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2346     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2347     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2348     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2349     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2350
2351     return s->device_name;
2352 }
2353
2354 int pa_stream_is_suspended(pa_stream *s) {
2355     pa_assert(s);
2356     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2357
2358     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2359     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2360     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2361     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2362
2363     return s->suspended;
2364 }
2365
2366 int pa_stream_is_corked(pa_stream *s) {
2367     pa_assert(s);
2368     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2369
2370     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2371     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2372     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2373
2374     return s->corked;
2375 }
2376
2377 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2378     pa_operation *o = userdata;
2379     int success = 1;
2380
2381     pa_assert(pd);
2382     pa_assert(o);
2383     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2384
2385     if (!o->context)
2386         goto finish;
2387
2388     if (command != PA_COMMAND_REPLY) {
2389         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2390             goto finish;
2391
2392         success = 0;
2393     } else {
2394
2395         if (!pa_tagstruct_eof(t)) {
2396             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2397             goto finish;
2398         }
2399     }
2400
2401     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2402     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2403
2404     if (o->callback) {
2405         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2406         cb(o->stream, success, o->userdata);
2407     }
2408
2409 finish:
2410     pa_operation_done(o);
2411     pa_operation_unref(o);
2412 }
2413
2414
2415 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2416     pa_operation *o;
2417     pa_tagstruct *t;
2418     uint32_t tag;
2419
2420     pa_assert(s);
2421     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2422
2423     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2424     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2425     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2426     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2427     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2428     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2429
2430     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2431     o->private = PA_UINT_TO_PTR(rate);
2432
2433     t = pa_tagstruct_command(
2434             s->context,
2435             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2436             &tag);
2437     pa_tagstruct_putu32(t, s->channel);
2438     pa_tagstruct_putu32(t, rate);
2439
2440     pa_pstream_send_tagstruct(s->context->pstream, t);
2441     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2442
2443     return o;
2444 }
2445
2446 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2447     pa_operation *o;
2448     pa_tagstruct *t;
2449     uint32_t tag;
2450
2451     pa_assert(s);
2452     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2453
2454     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2455     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2456     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2457     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2458     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2459
2460     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2461
2462     t = pa_tagstruct_command(
2463             s->context,
2464             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2465             &tag);
2466     pa_tagstruct_putu32(t, s->channel);
2467     pa_tagstruct_putu32(t, (uint32_t) mode);
2468     pa_tagstruct_put_proplist(t, p);
2469
2470     pa_pstream_send_tagstruct(s->context->pstream, t);
2471     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2472
2473     /* Please note that we don't update s->proplist here, because we
2474      * don't export that field */
2475
2476     return o;
2477 }
2478
2479 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2480     pa_operation *o;
2481     pa_tagstruct *t;
2482     uint32_t tag;
2483     const char * const*k;
2484
2485     pa_assert(s);
2486     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2487
2488     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2489     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2490     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2491     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2492     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2493
2494     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2495
2496     t = pa_tagstruct_command(
2497             s->context,
2498             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2499             &tag);
2500     pa_tagstruct_putu32(t, s->channel);
2501
2502     for (k = keys; *k; k++)
2503         pa_tagstruct_puts(t, *k);
2504
2505     pa_tagstruct_puts(t, NULL);
2506
2507     pa_pstream_send_tagstruct(s->context->pstream, t);
2508     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2509
2510     /* Please note that we don't update s->proplist here, because we
2511      * don't export that field */
2512
2513     return o;
2514 }
2515
2516 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2517     pa_assert(s);
2518     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2519
2520     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2521     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2522     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2523     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2524
2525     s->direct_on_input = sink_input_idx;
2526
2527     return 0;
2528 }
2529
2530 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2531     pa_assert(s);
2532     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2533
2534     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2535     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2536     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2537
2538     return s->direct_on_input;
2539 }