Merge commit 'elmarco/bluetooth-fixes'
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "fork-detect.h"
42 #include "internal.h"
43
44 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
45
46 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_MIN_HISTORY (4)
49
50 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
51     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
52 }
53
54 static void reset_callbacks(pa_stream *s) {
55     s->read_callback = NULL;
56     s->read_userdata = NULL;
57     s->write_callback = NULL;
58     s->write_userdata = NULL;
59     s->state_callback = NULL;
60     s->state_userdata = NULL;
61     s->overflow_callback = NULL;
62     s->overflow_userdata = NULL;
63     s->underflow_callback = NULL;
64     s->underflow_userdata = NULL;
65     s->latency_update_callback = NULL;
66     s->latency_update_userdata = NULL;
67     s->moved_callback = NULL;
68     s->moved_userdata = NULL;
69     s->suspended_callback = NULL;
70     s->suspended_userdata = NULL;
71     s->started_callback = NULL;
72     s->started_userdata = NULL;
73     s->event_callback = NULL;
74     s->event_userdata = NULL;
75     s->buffer_attr_callback = NULL;
76     s->buffer_attr_userdata = NULL;
77 }
78
79 pa_stream *pa_stream_new_with_proplist(
80         pa_context *c,
81         const char *name,
82         const pa_sample_spec *ss,
83         const pa_channel_map *map,
84         pa_proplist *p) {
85
86     pa_stream *s;
87     int i;
88     pa_channel_map tmap;
89
90     pa_assert(c);
91     pa_assert(PA_REFCNT_VALUE(c) >= 1);
92
93     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
94     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
95     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
100
101     if (!map)
102         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
103
104     s = pa_xnew(pa_stream, 1);
105     PA_REFCNT_INIT(s);
106     s->context = c;
107     s->mainloop = c->mainloop;
108
109     s->direction = PA_STREAM_NODIRECTION;
110     s->state = PA_STREAM_UNCONNECTED;
111     s->flags = 0;
112
113     s->sample_spec = *ss;
114     s->channel_map = *map;
115
116     s->direct_on_input = PA_INVALID_INDEX;
117
118     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
119     if (name)
120         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
121
122     s->channel = 0;
123     s->channel_valid = FALSE;
124     s->syncid = c->csyncid++;
125     s->stream_index = PA_INVALID_INDEX;
126
127     s->requested_bytes = 0;
128     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
129
130     /* We initialize der target length here, so that if the user
131      * passes no explicit buffering metrics the default is similar to
132      * what older PA versions provided. */
133
134     s->buffer_attr.maxlength = (uint32_t) -1;
135     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
136     s->buffer_attr.minreq = (uint32_t) -1;
137     s->buffer_attr.prebuf = (uint32_t) -1;
138     s->buffer_attr.fragsize = (uint32_t) -1;
139
140     s->device_index = PA_INVALID_INDEX;
141     s->device_name = NULL;
142     s->suspended = FALSE;
143     s->corked = FALSE;
144
145     pa_memchunk_reset(&s->peek_memchunk);
146     s->peek_data = NULL;
147
148     s->record_memblockq = NULL;
149
150
151     memset(&s->timing_info, 0, sizeof(s->timing_info));
152     s->timing_info_valid = FALSE;
153
154     s->previous_time = 0;
155
156     s->read_index_not_before = 0;
157     s->write_index_not_before = 0;
158     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
159         s->write_index_corrections[i].valid = 0;
160     s->current_write_index_correction = 0;
161
162     s->auto_timing_update_event = NULL;
163     s->auto_timing_update_requested = FALSE;
164
165     reset_callbacks(s);
166
167     s->smoother = NULL;
168
169     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
170     PA_LLIST_PREPEND(pa_stream, c->streams, s);
171     pa_stream_ref(s);
172
173     return s;
174 }
175
176 static void stream_unlink(pa_stream *s) {
177     pa_operation *o, *n;
178     pa_assert(s);
179
180     if (!s->context)
181         return;
182
183     /* Detach from context */
184
185     /* Unref all operatio object that point to us */
186     for (o = s->context->operations; o; o = n) {
187         n = o->next;
188
189         if (o->stream == s)
190             pa_operation_cancel(o);
191     }
192
193     /* Drop all outstanding replies for this stream */
194     if (s->context->pdispatch)
195         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
196
197     if (s->channel_valid) {
198         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
199         s->channel = 0;
200         s->channel_valid = FALSE;
201     }
202
203     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
204     pa_stream_unref(s);
205
206     s->context = NULL;
207
208     if (s->auto_timing_update_event) {
209         pa_assert(s->mainloop);
210         s->mainloop->time_free(s->auto_timing_update_event);
211     }
212
213     reset_callbacks(s);
214 }
215
216 static void stream_free(pa_stream *s) {
217     pa_assert(s);
218
219     stream_unlink(s);
220
221     if (s->peek_memchunk.memblock) {
222         if (s->peek_data)
223             pa_memblock_release(s->peek_memchunk.memblock);
224         pa_memblock_unref(s->peek_memchunk.memblock);
225     }
226
227     if (s->record_memblockq)
228         pa_memblockq_free(s->record_memblockq);
229
230     if (s->proplist)
231         pa_proplist_free(s->proplist);
232
233     if (s->smoother)
234         pa_smoother_free(s->smoother);
235
236     pa_xfree(s->device_name);
237     pa_xfree(s);
238 }
239
240 void pa_stream_unref(pa_stream *s) {
241     pa_assert(s);
242     pa_assert(PA_REFCNT_VALUE(s) >= 1);
243
244     if (PA_REFCNT_DEC(s) <= 0)
245         stream_free(s);
246 }
247
248 pa_stream* pa_stream_ref(pa_stream *s) {
249     pa_assert(s);
250     pa_assert(PA_REFCNT_VALUE(s) >= 1);
251
252     PA_REFCNT_INC(s);
253     return s;
254 }
255
256 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
257     pa_assert(s);
258     pa_assert(PA_REFCNT_VALUE(s) >= 1);
259
260     return s->state;
261 }
262
263 pa_context* pa_stream_get_context(pa_stream *s) {
264     pa_assert(s);
265     pa_assert(PA_REFCNT_VALUE(s) >= 1);
266
267     return s->context;
268 }
269
270 uint32_t pa_stream_get_index(pa_stream *s) {
271     pa_assert(s);
272     pa_assert(PA_REFCNT_VALUE(s) >= 1);
273
274     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
275     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
276
277     return s->stream_index;
278 }
279
280 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
281     pa_assert(s);
282     pa_assert(PA_REFCNT_VALUE(s) >= 1);
283
284     if (s->state == st)
285         return;
286
287     pa_stream_ref(s);
288
289     s->state = st;
290
291     if (s->state_callback)
292         s->state_callback(s, s->state_userdata);
293
294     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
295         stream_unlink(s);
296
297     pa_stream_unref(s);
298 }
299
300 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
301     pa_assert(s);
302     pa_assert(PA_REFCNT_VALUE(s) >= 1);
303
304     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
305         return;
306
307     if (s->state == PA_STREAM_READY &&
308         (force || !s->auto_timing_update_requested)) {
309         pa_operation *o;
310
311 /*         pa_log("automatically requesting new timing data"); */
312
313         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
314             pa_operation_unref(o);
315             s->auto_timing_update_requested = TRUE;
316         }
317     }
318
319     if (s->auto_timing_update_event) {
320         struct timeval next;
321         pa_gettimeofday(&next);
322         pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
323         s->mainloop->time_restart(s->auto_timing_update_event, &next);
324     }
325 }
326
327 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
328     pa_context *c = userdata;
329     pa_stream *s;
330     uint32_t channel;
331
332     pa_assert(pd);
333     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
334     pa_assert(t);
335     pa_assert(c);
336     pa_assert(PA_REFCNT_VALUE(c) >= 1);
337
338     pa_context_ref(c);
339
340     if (pa_tagstruct_getu32(t, &channel) < 0 ||
341         !pa_tagstruct_eof(t)) {
342         pa_context_fail(c, PA_ERR_PROTOCOL);
343         goto finish;
344     }
345
346     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
347         goto finish;
348
349     if (s->state != PA_STREAM_READY)
350         goto finish;
351
352     pa_context_set_error(c, PA_ERR_KILLED);
353     pa_stream_set_state(s, PA_STREAM_FAILED);
354
355 finish:
356     pa_context_unref(c);
357 }
358
359 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
360     pa_usec_t x;
361
362     pa_assert(s);
363     pa_assert(!force_start || !force_stop);
364
365     if (!s->smoother)
366         return;
367
368     x = pa_rtclock_usec();
369
370     if (s->timing_info_valid) {
371         if (aposteriori)
372             x -= s->timing_info.transport_usec;
373         else
374             x += s->timing_info.transport_usec;
375
376         if (s->direction == PA_STREAM_PLAYBACK)
377             /* it takes a while until the pause/resume is actually
378              * audible */
379             x += s->timing_info.sink_usec;
380         else
381             /* Data froma  while back will be dropped */
382             x -= s->timing_info.source_usec;
383     }
384
385     if (s->suspended || s->corked || force_stop)
386         pa_smoother_pause(s->smoother, x);
387     else if (force_start || s->buffer_attr.prebuf == 0)
388         pa_smoother_resume(s->smoother, x);
389
390     /* Please note that we have no idea if playback actually started
391      * if prebuf is non-zero! */
392 }
393
394 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
395     pa_context *c = userdata;
396     pa_stream *s;
397     uint32_t channel;
398     const char *dn;
399     pa_bool_t suspended;
400     uint32_t di;
401     pa_usec_t usec = 0;
402     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
403
404     pa_assert(pd);
405     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
406     pa_assert(t);
407     pa_assert(c);
408     pa_assert(PA_REFCNT_VALUE(c) >= 1);
409
410     pa_context_ref(c);
411
412     if (c->version < 12) {
413         pa_context_fail(c, PA_ERR_PROTOCOL);
414         goto finish;
415     }
416
417     if (pa_tagstruct_getu32(t, &channel) < 0 ||
418         pa_tagstruct_getu32(t, &di) < 0 ||
419         pa_tagstruct_gets(t, &dn) < 0 ||
420         pa_tagstruct_get_boolean(t, &suspended) < 0) {
421         pa_context_fail(c, PA_ERR_PROTOCOL);
422         goto finish;
423     }
424
425     if (c->version >= 13) {
426
427         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
428             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
429                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
430                 pa_tagstruct_get_usec(t, &usec) < 0) {
431                 pa_context_fail(c, PA_ERR_PROTOCOL);
432                 goto finish;
433             }
434         } else {
435             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
436                 pa_tagstruct_getu32(t, &tlength) < 0 ||
437                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
438                 pa_tagstruct_getu32(t, &minreq) < 0 ||
439                 pa_tagstruct_get_usec(t, &usec) < 0) {
440                 pa_context_fail(c, PA_ERR_PROTOCOL);
441                 goto finish;
442             }
443         }
444     }
445
446     if (!pa_tagstruct_eof(t)) {
447         pa_context_fail(c, PA_ERR_PROTOCOL);
448         goto finish;
449     }
450
451     if (!dn || di == PA_INVALID_INDEX) {
452         pa_context_fail(c, PA_ERR_PROTOCOL);
453         goto finish;
454     }
455
456     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
457         goto finish;
458
459     if (s->state != PA_STREAM_READY)
460         goto finish;
461
462     if (c->version >= 13) {
463         if (s->direction == PA_STREAM_RECORD)
464             s->timing_info.configured_source_usec = usec;
465         else
466             s->timing_info.configured_sink_usec = usec;
467
468         s->buffer_attr.maxlength = maxlength;
469         s->buffer_attr.fragsize = fragsize;
470         s->buffer_attr.tlength = tlength;
471         s->buffer_attr.prebuf = prebuf;
472         s->buffer_attr.minreq = minreq;
473     }
474
475     pa_xfree(s->device_name);
476     s->device_name = pa_xstrdup(dn);
477     s->device_index = di;
478
479     s->suspended = suspended;
480
481     check_smoother_status(s, TRUE, FALSE, FALSE);
482     request_auto_timing_update(s, TRUE);
483
484     if (s->moved_callback)
485         s->moved_callback(s, s->moved_userdata);
486
487 finish:
488     pa_context_unref(c);
489 }
490
491 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
492     pa_context *c = userdata;
493     pa_stream *s;
494     uint32_t channel;
495     pa_usec_t usec = 0;
496     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
497
498     pa_assert(pd);
499     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
500     pa_assert(t);
501     pa_assert(c);
502     pa_assert(PA_REFCNT_VALUE(c) >= 1);
503
504     pa_context_ref(c);
505
506     if (c->version < 15) {
507         pa_context_fail(c, PA_ERR_PROTOCOL);
508         goto finish;
509     }
510
511     if (pa_tagstruct_getu32(t, &channel) < 0) {
512         pa_context_fail(c, PA_ERR_PROTOCOL);
513         goto finish;
514     }
515
516     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
517         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
518             pa_tagstruct_getu32(t, &fragsize) < 0 ||
519             pa_tagstruct_get_usec(t, &usec) < 0) {
520             pa_context_fail(c, PA_ERR_PROTOCOL);
521             goto finish;
522         }
523     } else {
524         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
525             pa_tagstruct_getu32(t, &tlength) < 0 ||
526             pa_tagstruct_getu32(t, &prebuf) < 0 ||
527             pa_tagstruct_getu32(t, &minreq) < 0 ||
528             pa_tagstruct_get_usec(t, &usec) < 0) {
529             pa_context_fail(c, PA_ERR_PROTOCOL);
530             goto finish;
531         }
532     }
533
534     if (!pa_tagstruct_eof(t)) {
535         pa_context_fail(c, PA_ERR_PROTOCOL);
536         goto finish;
537     }
538
539     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
540         goto finish;
541
542     if (s->state != PA_STREAM_READY)
543         goto finish;
544
545     if (s->direction == PA_STREAM_RECORD)
546         s->timing_info.configured_source_usec = usec;
547     else
548         s->timing_info.configured_sink_usec = usec;
549
550     s->buffer_attr.maxlength = maxlength;
551     s->buffer_attr.fragsize = fragsize;
552     s->buffer_attr.tlength = tlength;
553     s->buffer_attr.prebuf = prebuf;
554     s->buffer_attr.minreq = minreq;
555
556     request_auto_timing_update(s, TRUE);
557
558     if (s->buffer_attr_callback)
559         s->buffer_attr_callback(s, s->buffer_attr_userdata);
560
561 finish:
562     pa_context_unref(c);
563 }
564
565 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
566     pa_context *c = userdata;
567     pa_stream *s;
568     uint32_t channel;
569     pa_bool_t suspended;
570
571     pa_assert(pd);
572     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
573     pa_assert(t);
574     pa_assert(c);
575     pa_assert(PA_REFCNT_VALUE(c) >= 1);
576
577     pa_context_ref(c);
578
579     if (c->version < 12) {
580         pa_context_fail(c, PA_ERR_PROTOCOL);
581         goto finish;
582     }
583
584     if (pa_tagstruct_getu32(t, &channel) < 0 ||
585         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
586         !pa_tagstruct_eof(t)) {
587         pa_context_fail(c, PA_ERR_PROTOCOL);
588         goto finish;
589     }
590
591     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
592         goto finish;
593
594     if (s->state != PA_STREAM_READY)
595         goto finish;
596
597     s->suspended = suspended;
598
599     check_smoother_status(s, TRUE, FALSE, FALSE);
600     request_auto_timing_update(s, TRUE);
601
602     if (s->suspended_callback)
603         s->suspended_callback(s, s->suspended_userdata);
604
605 finish:
606     pa_context_unref(c);
607 }
608
609 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
610     pa_context *c = userdata;
611     pa_stream *s;
612     uint32_t channel;
613
614     pa_assert(pd);
615     pa_assert(command == PA_COMMAND_STARTED);
616     pa_assert(t);
617     pa_assert(c);
618     pa_assert(PA_REFCNT_VALUE(c) >= 1);
619
620     pa_context_ref(c);
621
622     if (c->version < 13) {
623         pa_context_fail(c, PA_ERR_PROTOCOL);
624         goto finish;
625     }
626
627     if (pa_tagstruct_getu32(t, &channel) < 0 ||
628         !pa_tagstruct_eof(t)) {
629         pa_context_fail(c, PA_ERR_PROTOCOL);
630         goto finish;
631     }
632
633     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
634         goto finish;
635
636     if (s->state != PA_STREAM_READY)
637         goto finish;
638
639     check_smoother_status(s, TRUE, TRUE, FALSE);
640     request_auto_timing_update(s, TRUE);
641
642     if (s->started_callback)
643         s->started_callback(s, s->started_userdata);
644
645 finish:
646     pa_context_unref(c);
647 }
648
649 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
650     pa_context *c = userdata;
651     pa_stream *s;
652     uint32_t channel;
653     pa_proplist *pl = NULL;
654     const char *event;
655
656     pa_assert(pd);
657     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
658     pa_assert(t);
659     pa_assert(c);
660     pa_assert(PA_REFCNT_VALUE(c) >= 1);
661
662     pa_context_ref(c);
663
664     if (c->version < 15) {
665         pa_context_fail(c, PA_ERR_PROTOCOL);
666         goto finish;
667     }
668
669     pl = pa_proplist_new();
670
671     if (pa_tagstruct_getu32(t, &channel) < 0 ||
672         pa_tagstruct_gets(t, &event) < 0 ||
673         pa_tagstruct_get_proplist(t, pl) < 0 ||
674         !pa_tagstruct_eof(t) || !event) {
675         pa_context_fail(c, PA_ERR_PROTOCOL);
676         goto finish;
677     }
678
679     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
680         goto finish;
681
682     if (s->state != PA_STREAM_READY)
683         goto finish;
684
685     if (s->event_callback)
686         s->event_callback(s, event, pl, s->event_userdata);
687
688 finish:
689     pa_context_unref(c);
690
691     if (pl)
692         pa_proplist_free(pl);
693 }
694
695 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
696     pa_stream *s;
697     pa_context *c = userdata;
698     uint32_t bytes, channel;
699
700     pa_assert(pd);
701     pa_assert(command == PA_COMMAND_REQUEST);
702     pa_assert(t);
703     pa_assert(c);
704     pa_assert(PA_REFCNT_VALUE(c) >= 1);
705
706     pa_context_ref(c);
707
708     if (pa_tagstruct_getu32(t, &channel) < 0 ||
709         pa_tagstruct_getu32(t, &bytes) < 0 ||
710         !pa_tagstruct_eof(t)) {
711         pa_context_fail(c, PA_ERR_PROTOCOL);
712         goto finish;
713     }
714
715     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
716         goto finish;
717
718     if (s->state != PA_STREAM_READY)
719         goto finish;
720
721     s->requested_bytes += bytes;
722
723     if (s->requested_bytes > 0 && s->write_callback)
724         s->write_callback(s, s->requested_bytes, s->write_userdata);
725
726 finish:
727     pa_context_unref(c);
728 }
729
730 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
731     pa_stream *s;
732     pa_context *c = userdata;
733     uint32_t channel;
734
735     pa_assert(pd);
736     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
737     pa_assert(t);
738     pa_assert(c);
739     pa_assert(PA_REFCNT_VALUE(c) >= 1);
740
741     pa_context_ref(c);
742
743     if (pa_tagstruct_getu32(t, &channel) < 0 ||
744         !pa_tagstruct_eof(t)) {
745         pa_context_fail(c, PA_ERR_PROTOCOL);
746         goto finish;
747     }
748
749     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
750         goto finish;
751
752     if (s->state != PA_STREAM_READY)
753         goto finish;
754
755     if (s->buffer_attr.prebuf > 0)
756         check_smoother_status(s, TRUE, FALSE, TRUE);
757
758     request_auto_timing_update(s, TRUE);
759
760     if (command == PA_COMMAND_OVERFLOW) {
761         if (s->overflow_callback)
762             s->overflow_callback(s, s->overflow_userdata);
763     } else if (command == PA_COMMAND_UNDERFLOW) {
764         if (s->underflow_callback)
765             s->underflow_callback(s, s->underflow_userdata);
766     }
767
768  finish:
769     pa_context_unref(c);
770 }
771
772 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
773     pa_assert(s);
774     pa_assert(PA_REFCNT_VALUE(s) >= 1);
775
776 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
777
778     if (s->state != PA_STREAM_READY)
779         return;
780
781     if (w) {
782         s->write_index_not_before = s->context->ctag;
783
784         if (s->timing_info_valid)
785             s->timing_info.write_index_corrupt = TRUE;
786
787 /*         pa_log("write_index invalidated"); */
788     }
789
790     if (r) {
791         s->read_index_not_before = s->context->ctag;
792
793         if (s->timing_info_valid)
794             s->timing_info.read_index_corrupt = TRUE;
795
796 /*         pa_log("read_index invalidated"); */
797     }
798
799     request_auto_timing_update(s, TRUE);
800 }
801
802 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
803     pa_stream *s = userdata;
804
805     pa_assert(s);
806     pa_assert(PA_REFCNT_VALUE(s) >= 1);
807
808     pa_stream_ref(s);
809     request_auto_timing_update(s, FALSE);
810     pa_stream_unref(s);
811 }
812
813 static void create_stream_complete(pa_stream *s) {
814     pa_assert(s);
815     pa_assert(PA_REFCNT_VALUE(s) >= 1);
816     pa_assert(s->state == PA_STREAM_CREATING);
817
818     pa_stream_set_state(s, PA_STREAM_READY);
819
820     if (s->requested_bytes > 0 && s->write_callback)
821         s->write_callback(s, s->requested_bytes, s->write_userdata);
822
823     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
824         struct timeval tv;
825         pa_gettimeofday(&tv);
826         tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
827         pa_assert(!s->auto_timing_update_event);
828         s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
829
830         request_auto_timing_update(s, TRUE);
831     }
832
833     check_smoother_status(s, TRUE, FALSE, FALSE);
834 }
835
836 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
837     pa_assert(s);
838     pa_assert(attr);
839     pa_assert(ss);
840
841     if (s->context->version >= 13)
842         return;
843
844     /* Version older than 0.9.10 didn't do server side buffer_attr
845      * selection, hence we have to fake it on the client side. */
846
847     /* We choose fairly conservative values here, to not confuse
848      * old clients with extremely large playback buffers */
849
850     if (attr->maxlength == (uint32_t) -1)
851         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
852
853     if (attr->tlength == (uint32_t) -1)
854         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
855
856     if (attr->minreq == (uint32_t) -1)
857         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
858
859     if (attr->prebuf == (uint32_t) -1)
860         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
861
862     if (attr->fragsize == (uint32_t) -1)
863         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
864 }
865
866 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
867     pa_stream *s = userdata;
868
869     pa_assert(pd);
870     pa_assert(s);
871     pa_assert(PA_REFCNT_VALUE(s) >= 1);
872     pa_assert(s->state == PA_STREAM_CREATING);
873
874     pa_stream_ref(s);
875
876     if (command != PA_COMMAND_REPLY) {
877         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
878             goto finish;
879
880         pa_stream_set_state(s, PA_STREAM_FAILED);
881         goto finish;
882     }
883
884     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
885         s->channel == PA_INVALID_INDEX ||
886         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
887         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
888         pa_context_fail(s->context, PA_ERR_PROTOCOL);
889         goto finish;
890     }
891
892     if (s->context->version >= 9) {
893         if (s->direction == PA_STREAM_PLAYBACK) {
894             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
895                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
896                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
897                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
898                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
899                 goto finish;
900             }
901         } else if (s->direction == PA_STREAM_RECORD) {
902             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
903                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
904                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
905                 goto finish;
906             }
907         }
908     }
909
910     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
911         pa_sample_spec ss;
912         pa_channel_map cm;
913         const char *dn = NULL;
914         pa_bool_t suspended;
915
916         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
917             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
918             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
919             pa_tagstruct_gets(t, &dn) < 0 ||
920             pa_tagstruct_get_boolean(t, &suspended) < 0) {
921             pa_context_fail(s->context, PA_ERR_PROTOCOL);
922             goto finish;
923         }
924
925         if (!dn || s->device_index == PA_INVALID_INDEX ||
926             ss.channels != cm.channels ||
927             !pa_channel_map_valid(&cm) ||
928             !pa_sample_spec_valid(&ss) ||
929             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
930             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
931             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
932             pa_context_fail(s->context, PA_ERR_PROTOCOL);
933             goto finish;
934         }
935
936         pa_xfree(s->device_name);
937         s->device_name = pa_xstrdup(dn);
938         s->suspended = suspended;
939
940         s->channel_map = cm;
941         s->sample_spec = ss;
942     }
943
944     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
945         pa_usec_t usec;
946
947         if (pa_tagstruct_get_usec(t, &usec) < 0) {
948             pa_context_fail(s->context, PA_ERR_PROTOCOL);
949             goto finish;
950         }
951
952         if (s->direction == PA_STREAM_RECORD)
953             s->timing_info.configured_source_usec = usec;
954         else
955             s->timing_info.configured_sink_usec = usec;
956     }
957
958     if (!pa_tagstruct_eof(t)) {
959         pa_context_fail(s->context, PA_ERR_PROTOCOL);
960         goto finish;
961     }
962
963     if (s->direction == PA_STREAM_RECORD) {
964         pa_assert(!s->record_memblockq);
965
966         s->record_memblockq = pa_memblockq_new(
967                 0,
968                 s->buffer_attr.maxlength,
969                 0,
970                 pa_frame_size(&s->sample_spec),
971                 1,
972                 0,
973                 0,
974                 NULL);
975     }
976
977     s->channel_valid = TRUE;
978     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
979
980     create_stream_complete(s);
981
982 finish:
983     pa_stream_unref(s);
984 }
985
986 static int create_stream(
987         pa_stream_direction_t direction,
988         pa_stream *s,
989         const char *dev,
990         const pa_buffer_attr *attr,
991         pa_stream_flags_t flags,
992         const pa_cvolume *volume,
993         pa_stream *sync_stream) {
994
995     pa_tagstruct *t;
996     uint32_t tag;
997     pa_bool_t volume_set = FALSE;
998
999     pa_assert(s);
1000     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1001     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1002
1003     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1004     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1005     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1006     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1007                                               PA_STREAM_INTERPOLATE_TIMING|
1008                                               PA_STREAM_NOT_MONOTONIC|
1009                                               PA_STREAM_AUTO_TIMING_UPDATE|
1010                                               PA_STREAM_NO_REMAP_CHANNELS|
1011                                               PA_STREAM_NO_REMIX_CHANNELS|
1012                                               PA_STREAM_FIX_FORMAT|
1013                                               PA_STREAM_FIX_RATE|
1014                                               PA_STREAM_FIX_CHANNELS|
1015                                               PA_STREAM_DONT_MOVE|
1016                                               PA_STREAM_VARIABLE_RATE|
1017                                               PA_STREAM_PEAK_DETECT|
1018                                               PA_STREAM_START_MUTED|
1019                                               PA_STREAM_ADJUST_LATENCY|
1020                                               PA_STREAM_EARLY_REQUESTS|
1021                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1022                                               PA_STREAM_START_UNMUTED|
1023                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1024
1025     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1026     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1027     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1028     /* Althought some of the other flags are not supported on older
1029      * version, we don't check for them here, because it doesn't hurt
1030      * when they are passed but actually not supported. This makes
1031      * client development easier */
1032
1033     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1034     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1035     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1036     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1037     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1038
1039     pa_stream_ref(s);
1040
1041     s->direction = direction;
1042     s->flags = flags;
1043     s->corked = !!(flags & PA_STREAM_START_CORKED);
1044
1045     if (sync_stream)
1046         s->syncid = sync_stream->syncid;
1047
1048     if (attr)
1049         s->buffer_attr = *attr;
1050     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1051
1052     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1053         pa_usec_t x;
1054
1055         if (s->smoother)
1056             pa_smoother_free(s->smoother);
1057
1058         s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
1059
1060         x = pa_rtclock_usec();
1061         pa_smoother_set_time_offset(s->smoother, x);
1062         pa_smoother_pause(s->smoother, x);
1063     }
1064
1065     if (!dev)
1066         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1067
1068     t = pa_tagstruct_command(
1069             s->context,
1070             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1071             &tag);
1072
1073     if (s->context->version < 13)
1074         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1075
1076     pa_tagstruct_put(
1077             t,
1078             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1079             PA_TAG_CHANNEL_MAP, &s->channel_map,
1080             PA_TAG_U32, PA_INVALID_INDEX,
1081             PA_TAG_STRING, dev,
1082             PA_TAG_U32, s->buffer_attr.maxlength,
1083             PA_TAG_BOOLEAN, s->corked,
1084             PA_TAG_INVALID);
1085
1086     if (s->direction == PA_STREAM_PLAYBACK) {
1087         pa_cvolume cv;
1088
1089         pa_tagstruct_put(
1090                 t,
1091                 PA_TAG_U32, s->buffer_attr.tlength,
1092                 PA_TAG_U32, s->buffer_attr.prebuf,
1093                 PA_TAG_U32, s->buffer_attr.minreq,
1094                 PA_TAG_U32, s->syncid,
1095                 PA_TAG_INVALID);
1096
1097         volume_set = !!volume;
1098
1099         if (!volume)
1100             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1101
1102         pa_tagstruct_put_cvolume(t, volume);
1103     } else
1104         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1105
1106     if (s->context->version >= 12) {
1107         pa_tagstruct_put(
1108                 t,
1109                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1110                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1111                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1112                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1113                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1114                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1115                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1116                 PA_TAG_INVALID);
1117     }
1118
1119     if (s->context->version >= 13) {
1120
1121         if (s->direction == PA_STREAM_PLAYBACK)
1122             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1123         else
1124             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1125
1126         pa_tagstruct_put(
1127                 t,
1128                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1129                 PA_TAG_PROPLIST, s->proplist,
1130                 PA_TAG_INVALID);
1131
1132         if (s->direction == PA_STREAM_RECORD)
1133             pa_tagstruct_putu32(t, s->direct_on_input);
1134     }
1135
1136     if (s->context->version >= 14) {
1137
1138         if (s->direction == PA_STREAM_PLAYBACK)
1139             pa_tagstruct_put_boolean(t, volume_set);
1140
1141         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1142     }
1143
1144     if (s->context->version >= 15) {
1145
1146         if (s->direction == PA_STREAM_PLAYBACK)
1147             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1148
1149         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1150         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1151     }
1152
1153     pa_pstream_send_tagstruct(s->context->pstream, t);
1154     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1155
1156     pa_stream_set_state(s, PA_STREAM_CREATING);
1157
1158     pa_stream_unref(s);
1159     return 0;
1160 }
1161
1162 int pa_stream_connect_playback(
1163         pa_stream *s,
1164         const char *dev,
1165         const pa_buffer_attr *attr,
1166         pa_stream_flags_t flags,
1167         pa_cvolume *volume,
1168         pa_stream *sync_stream) {
1169
1170     pa_assert(s);
1171     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1172
1173     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1174 }
1175
1176 int pa_stream_connect_record(
1177         pa_stream *s,
1178         const char *dev,
1179         const pa_buffer_attr *attr,
1180         pa_stream_flags_t flags) {
1181
1182     pa_assert(s);
1183     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1184
1185     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1186 }
1187
1188 int pa_stream_write(
1189         pa_stream *s,
1190         const void *data,
1191         size_t length,
1192         void (*free_cb)(void *p),
1193         int64_t offset,
1194         pa_seek_mode_t seek) {
1195
1196     pa_memchunk chunk;
1197     pa_seek_mode_t t_seek;
1198     int64_t t_offset;
1199     size_t t_length;
1200     const void *t_data;
1201
1202     pa_assert(s);
1203     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1204     pa_assert(data);
1205
1206     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1207     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1208     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1209     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1210     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1211
1212     if (length <= 0)
1213         return 0;
1214
1215     t_seek = seek;
1216     t_offset = offset;
1217     t_length = length;
1218     t_data = data;
1219
1220     while (t_length > 0) {
1221
1222         chunk.index = 0;
1223
1224         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1225             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1226             chunk.length = t_length;
1227         } else {
1228             void *d;
1229
1230             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1231             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1232
1233             d = pa_memblock_acquire(chunk.memblock);
1234             memcpy(d, t_data, chunk.length);
1235             pa_memblock_release(chunk.memblock);
1236         }
1237
1238         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1239
1240         t_offset = 0;
1241         t_seek = PA_SEEK_RELATIVE;
1242
1243         t_data = (const uint8_t*) t_data + chunk.length;
1244         t_length -= chunk.length;
1245
1246         pa_memblock_unref(chunk.memblock);
1247     }
1248
1249     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1250         free_cb((void*) data);
1251
1252     if (length < s->requested_bytes)
1253         s->requested_bytes -= (uint32_t) length;
1254     else
1255         s->requested_bytes = 0;
1256
1257     /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1258
1259     if (s->direction == PA_STREAM_PLAYBACK) {
1260
1261         /* Update latency request correction */
1262         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1263
1264             if (seek == PA_SEEK_ABSOLUTE) {
1265                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1266                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1267                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1268             } else if (seek == PA_SEEK_RELATIVE) {
1269                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1270                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1271             } else
1272                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1273         }
1274
1275         /* Update the write index in the already available latency data */
1276         if (s->timing_info_valid) {
1277
1278             if (seek == PA_SEEK_ABSOLUTE) {
1279                 s->timing_info.write_index_corrupt = FALSE;
1280                 s->timing_info.write_index = offset + (int64_t) length;
1281             } else if (seek == PA_SEEK_RELATIVE) {
1282                 if (!s->timing_info.write_index_corrupt)
1283                     s->timing_info.write_index += offset + (int64_t) length;
1284             } else
1285                 s->timing_info.write_index_corrupt = TRUE;
1286         }
1287
1288         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1289             request_auto_timing_update(s, TRUE);
1290     }
1291
1292     return 0;
1293 }
1294
1295 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1296     pa_assert(s);
1297     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1298     pa_assert(data);
1299     pa_assert(length);
1300
1301     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1302     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1303     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1304
1305     if (!s->peek_memchunk.memblock) {
1306
1307         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1308             *data = NULL;
1309             *length = 0;
1310             return 0;
1311         }
1312
1313         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1314     }
1315
1316     pa_assert(s->peek_data);
1317     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1318     *length = s->peek_memchunk.length;
1319     return 0;
1320 }
1321
1322 int pa_stream_drop(pa_stream *s) {
1323     pa_assert(s);
1324     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1325
1326     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1327     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1328     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1329     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1330
1331     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1332
1333     /* Fix the simulated local read index */
1334     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1335         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1336
1337     pa_assert(s->peek_data);
1338     pa_memblock_release(s->peek_memchunk.memblock);
1339     pa_memblock_unref(s->peek_memchunk.memblock);
1340     pa_memchunk_reset(&s->peek_memchunk);
1341
1342     return 0;
1343 }
1344
1345 size_t pa_stream_writable_size(pa_stream *s) {
1346     pa_assert(s);
1347     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1348
1349     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1350     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1351     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1352
1353     return s->requested_bytes;
1354 }
1355
1356 size_t pa_stream_readable_size(pa_stream *s) {
1357     pa_assert(s);
1358     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1359
1360     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1361     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1362     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1363
1364     return pa_memblockq_get_length(s->record_memblockq);
1365 }
1366
1367 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1368     pa_operation *o;
1369     pa_tagstruct *t;
1370     uint32_t tag;
1371
1372     pa_assert(s);
1373     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1374
1375     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1376     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1377     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1378
1379     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1380
1381     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1382     pa_tagstruct_putu32(t, s->channel);
1383     pa_pstream_send_tagstruct(s->context->pstream, t);
1384     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1385
1386     return o;
1387 }
1388
1389 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1390     pa_usec_t usec;
1391
1392     pa_assert(s);
1393     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1394     pa_assert(s->state == PA_STREAM_READY);
1395     pa_assert(s->direction != PA_STREAM_UPLOAD);
1396     pa_assert(s->timing_info_valid);
1397     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1398     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1399
1400     if (s->direction == PA_STREAM_PLAYBACK) {
1401         /* The last byte that was written into the output device
1402          * had this time value associated */
1403         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1404
1405         if (!s->corked && !s->suspended) {
1406
1407             if (!ignore_transport)
1408                 /* Because the latency info took a little time to come
1409                  * to us, we assume that the real output time is actually
1410                  * a little ahead */
1411                 usec += s->timing_info.transport_usec;
1412
1413             /* However, the output device usually maintains a buffer
1414                too, hence the real sample currently played is a little
1415                back  */
1416             if (s->timing_info.sink_usec >= usec)
1417                 usec = 0;
1418             else
1419                 usec -= s->timing_info.sink_usec;
1420         }
1421
1422     } else {
1423         pa_assert(s->direction == PA_STREAM_RECORD);
1424
1425         /* The last byte written into the server side queue had
1426          * this time value associated */
1427         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1428
1429         if (!s->corked && !s->suspended) {
1430
1431             if (!ignore_transport)
1432                 /* Add transport latency */
1433                 usec += s->timing_info.transport_usec;
1434
1435             /* Add latency of data in device buffer */
1436             usec += s->timing_info.source_usec;
1437
1438             /* If this is a monitor source, we need to correct the
1439              * time by the playback device buffer */
1440             if (s->timing_info.sink_usec >= usec)
1441                 usec = 0;
1442             else
1443                 usec -= s->timing_info.sink_usec;
1444         }
1445     }
1446
1447     return usec;
1448 }
1449
1450 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1451     pa_operation *o = userdata;
1452     struct timeval local, remote, now;
1453     pa_timing_info *i;
1454     pa_bool_t playing = FALSE;
1455     uint64_t underrun_for = 0, playing_for = 0;
1456
1457     pa_assert(pd);
1458     pa_assert(o);
1459     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1460
1461     if (!o->context || !o->stream)
1462         goto finish;
1463
1464     i = &o->stream->timing_info;
1465
1466     o->stream->timing_info_valid = FALSE;
1467     i->write_index_corrupt = TRUE;
1468     i->read_index_corrupt = TRUE;
1469
1470     if (command != PA_COMMAND_REPLY) {
1471         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1472             goto finish;
1473
1474     } else {
1475
1476         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1477             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1478             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1479             pa_tagstruct_get_timeval(t, &local) < 0 ||
1480             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1481             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1482             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1483
1484             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1485             goto finish;
1486         }
1487
1488         if (o->context->version >= 13 &&
1489             o->stream->direction == PA_STREAM_PLAYBACK)
1490             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1491                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1492
1493                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1494                 goto finish;
1495             }
1496
1497
1498         if (!pa_tagstruct_eof(t)) {
1499             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1500             goto finish;
1501         }
1502         o->stream->timing_info_valid = TRUE;
1503         i->write_index_corrupt = FALSE;
1504         i->read_index_corrupt = FALSE;
1505
1506         i->playing = (int) playing;
1507         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1508
1509         pa_gettimeofday(&now);
1510
1511         /* Calculcate timestamps */
1512         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1513             /* local and remote seem to have synchronized clocks */
1514
1515             if (o->stream->direction == PA_STREAM_PLAYBACK)
1516                 i->transport_usec = pa_timeval_diff(&remote, &local);
1517             else
1518                 i->transport_usec = pa_timeval_diff(&now, &remote);
1519
1520             i->synchronized_clocks = TRUE;
1521             i->timestamp = remote;
1522         } else {
1523             /* clocks are not synchronized, let's estimate latency then */
1524             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1525             i->synchronized_clocks = FALSE;
1526             i->timestamp = local;
1527             pa_timeval_add(&i->timestamp, i->transport_usec);
1528         }
1529
1530         /* Invalidate read and write indexes if necessary */
1531         if (tag < o->stream->read_index_not_before)
1532             i->read_index_corrupt = TRUE;
1533
1534         if (tag < o->stream->write_index_not_before)
1535             i->write_index_corrupt = TRUE;
1536
1537         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1538             /* Write index correction */
1539
1540             int n, j;
1541             uint32_t ctag = tag;
1542
1543             /* Go through the saved correction values and add up the
1544              * total correction.*/
1545             for (n = 0, j = o->stream->current_write_index_correction+1;
1546                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1547                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1548
1549                 /* Step over invalid data or out-of-date data */
1550                 if (!o->stream->write_index_corrections[j].valid ||
1551                     o->stream->write_index_corrections[j].tag < ctag)
1552                     continue;
1553
1554                 /* Make sure that everything is in order */
1555                 ctag = o->stream->write_index_corrections[j].tag+1;
1556
1557                 /* Now fix the write index */
1558                 if (o->stream->write_index_corrections[j].corrupt) {
1559                     /* A corrupting seek was made */
1560                     i->write_index_corrupt = TRUE;
1561                 } else if (o->stream->write_index_corrections[j].absolute) {
1562                     /* An absolute seek was made */
1563                     i->write_index = o->stream->write_index_corrections[j].value;
1564                     i->write_index_corrupt = FALSE;
1565                 } else if (!i->write_index_corrupt) {
1566                     /* A relative seek was made */
1567                     i->write_index += o->stream->write_index_corrections[j].value;
1568                 }
1569             }
1570
1571             /* Clear old correction entries */
1572             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1573                 if (!o->stream->write_index_corrections[n].valid)
1574                     continue;
1575
1576                 if (o->stream->write_index_corrections[n].tag <= tag)
1577                     o->stream->write_index_corrections[n].valid = FALSE;
1578             }
1579         }
1580
1581         if (o->stream->direction == PA_STREAM_RECORD) {
1582             /* Read index correction */
1583
1584             if (!i->read_index_corrupt)
1585                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1586         }
1587
1588         /* Update smoother */
1589         if (o->stream->smoother) {
1590             pa_usec_t u, x;
1591
1592             u = x = pa_rtclock_usec() - i->transport_usec;
1593
1594             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1595                 pa_usec_t su;
1596
1597                 /* If we weren't playing then it will take some time
1598                  * until the audio will actually come out through the
1599                  * speakers. Since we follow that timing here, we need
1600                  * to try to fix this up */
1601
1602                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1603
1604                 if (su < i->sink_usec)
1605                     x += i->sink_usec - su;
1606             }
1607
1608             if (!i->playing)
1609                 pa_smoother_pause(o->stream->smoother, x);
1610
1611             /* Update the smoother */
1612             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1613                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1614                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1615
1616             if (i->playing)
1617                 pa_smoother_resume(o->stream->smoother, x);
1618         }
1619     }
1620
1621     o->stream->auto_timing_update_requested = FALSE;
1622
1623     if (o->stream->latency_update_callback)
1624         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1625
1626     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1627         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1628         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1629     }
1630
1631 finish:
1632
1633     pa_operation_done(o);
1634     pa_operation_unref(o);
1635 }
1636
1637 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1638     uint32_t tag;
1639     pa_operation *o;
1640     pa_tagstruct *t;
1641     struct timeval now;
1642     int cidx = 0;
1643
1644     pa_assert(s);
1645     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1646
1647     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1648     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1649     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1650
1651     if (s->direction == PA_STREAM_PLAYBACK) {
1652         /* Find a place to store the write_index correction data for this entry */
1653         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1654
1655         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1656         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1657     }
1658     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1659
1660     t = pa_tagstruct_command(
1661             s->context,
1662             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1663             &tag);
1664     pa_tagstruct_putu32(t, s->channel);
1665     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1666
1667     pa_pstream_send_tagstruct(s->context->pstream, t);
1668     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1669
1670     if (s->direction == PA_STREAM_PLAYBACK) {
1671         /* Fill in initial correction data */
1672
1673         s->current_write_index_correction = cidx;
1674
1675         s->write_index_corrections[cidx].valid = TRUE;
1676         s->write_index_corrections[cidx].absolute = FALSE;
1677         s->write_index_corrections[cidx].corrupt = FALSE;
1678         s->write_index_corrections[cidx].tag = tag;
1679         s->write_index_corrections[cidx].value = 0;
1680     }
1681
1682     return o;
1683 }
1684
1685 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1686     pa_stream *s = userdata;
1687
1688     pa_assert(pd);
1689     pa_assert(s);
1690     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1691
1692     pa_stream_ref(s);
1693
1694     if (command != PA_COMMAND_REPLY) {
1695         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1696             goto finish;
1697
1698         pa_stream_set_state(s, PA_STREAM_FAILED);
1699         goto finish;
1700     } else if (!pa_tagstruct_eof(t)) {
1701         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1702         goto finish;
1703     }
1704
1705     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1706
1707 finish:
1708     pa_stream_unref(s);
1709 }
1710
1711 int pa_stream_disconnect(pa_stream *s) {
1712     pa_tagstruct *t;
1713     uint32_t tag;
1714
1715     pa_assert(s);
1716     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1717
1718     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1719     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1720     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1721
1722     pa_stream_ref(s);
1723
1724     t = pa_tagstruct_command(
1725             s->context,
1726             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1727                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1728             &tag);
1729     pa_tagstruct_putu32(t, s->channel);
1730     pa_pstream_send_tagstruct(s->context->pstream, t);
1731     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1732
1733     pa_stream_unref(s);
1734     return 0;
1735 }
1736
1737 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1738     pa_assert(s);
1739     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1740
1741     if (pa_detect_fork())
1742         return;
1743
1744     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1745         return;
1746
1747     s->read_callback = cb;
1748     s->read_userdata = userdata;
1749 }
1750
1751 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1752     pa_assert(s);
1753     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1754
1755     if (pa_detect_fork())
1756         return;
1757
1758     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1759         return;
1760
1761     s->write_callback = cb;
1762     s->write_userdata = userdata;
1763 }
1764
1765 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1766     pa_assert(s);
1767     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1768
1769     if (pa_detect_fork())
1770         return;
1771
1772     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1773         return;
1774
1775     s->state_callback = cb;
1776     s->state_userdata = userdata;
1777 }
1778
1779 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1780     pa_assert(s);
1781     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1782
1783     if (pa_detect_fork())
1784         return;
1785
1786     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1787         return;
1788
1789     s->overflow_callback = cb;
1790     s->overflow_userdata = userdata;
1791 }
1792
1793 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1794     pa_assert(s);
1795     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1796
1797     if (pa_detect_fork())
1798         return;
1799
1800     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1801         return;
1802
1803     s->underflow_callback = cb;
1804     s->underflow_userdata = userdata;
1805 }
1806
1807 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1808     pa_assert(s);
1809     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1810
1811     if (pa_detect_fork())
1812         return;
1813
1814     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1815         return;
1816
1817     s->latency_update_callback = cb;
1818     s->latency_update_userdata = userdata;
1819 }
1820
1821 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1822     pa_assert(s);
1823     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1824
1825     if (pa_detect_fork())
1826         return;
1827
1828     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1829         return;
1830
1831     s->moved_callback = cb;
1832     s->moved_userdata = userdata;
1833 }
1834
1835 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1836     pa_assert(s);
1837     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1838
1839     if (pa_detect_fork())
1840         return;
1841
1842     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1843         return;
1844
1845     s->suspended_callback = cb;
1846     s->suspended_userdata = userdata;
1847 }
1848
1849 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1850     pa_assert(s);
1851     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1852
1853     if (pa_detect_fork())
1854         return;
1855
1856     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1857         return;
1858
1859     s->started_callback = cb;
1860     s->started_userdata = userdata;
1861 }
1862
1863 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1864     pa_assert(s);
1865     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1866
1867     if (pa_detect_fork())
1868         return;
1869
1870     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1871         return;
1872
1873     s->event_callback = cb;
1874     s->event_userdata = userdata;
1875 }
1876
1877 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1878     pa_assert(s);
1879     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1880
1881     if (pa_detect_fork())
1882         return;
1883
1884     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1885         return;
1886
1887     s->buffer_attr_callback = cb;
1888     s->buffer_attr_userdata = userdata;
1889 }
1890
1891 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1892     pa_operation *o = userdata;
1893     int success = 1;
1894
1895     pa_assert(pd);
1896     pa_assert(o);
1897     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1898
1899     if (!o->context)
1900         goto finish;
1901
1902     if (command != PA_COMMAND_REPLY) {
1903         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1904             goto finish;
1905
1906         success = 0;
1907     } else if (!pa_tagstruct_eof(t)) {
1908         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1909         goto finish;
1910     }
1911
1912     if (o->callback) {
1913         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1914         cb(o->stream, success, o->userdata);
1915     }
1916
1917 finish:
1918     pa_operation_done(o);
1919     pa_operation_unref(o);
1920 }
1921
1922 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1923     pa_operation *o;
1924     pa_tagstruct *t;
1925     uint32_t tag;
1926
1927     pa_assert(s);
1928     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1929
1930     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1931     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1932     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1933
1934     s->corked = b;
1935
1936     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1937
1938     t = pa_tagstruct_command(
1939             s->context,
1940             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1941             &tag);
1942     pa_tagstruct_putu32(t, s->channel);
1943     pa_tagstruct_put_boolean(t, !!b);
1944     pa_pstream_send_tagstruct(s->context->pstream, t);
1945     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1946
1947     check_smoother_status(s, FALSE, FALSE, FALSE);
1948
1949     /* This might cause the indexes to hang/start again, hence
1950      * let's request a timing update */
1951     request_auto_timing_update(s, TRUE);
1952
1953     return o;
1954 }
1955
1956 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1957     pa_tagstruct *t;
1958     pa_operation *o;
1959     uint32_t tag;
1960
1961     pa_assert(s);
1962     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1963
1964     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1965     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1966
1967     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1968
1969     t = pa_tagstruct_command(s->context, command, &tag);
1970     pa_tagstruct_putu32(t, s->channel);
1971     pa_pstream_send_tagstruct(s->context->pstream, t);
1972     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1973
1974     return o;
1975 }
1976
1977 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1978     pa_operation *o;
1979
1980     pa_assert(s);
1981     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1982
1983     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1984     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1985     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1986
1987     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1988         return NULL;
1989
1990     if (s->direction == PA_STREAM_PLAYBACK) {
1991
1992         if (s->write_index_corrections[s->current_write_index_correction].valid)
1993             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1994
1995         if (s->buffer_attr.prebuf > 0)
1996             check_smoother_status(s, FALSE, FALSE, TRUE);
1997
1998         /* This will change the write index, but leave the
1999          * read index untouched. */
2000         invalidate_indexes(s, FALSE, TRUE);
2001
2002     } else
2003         /* For record streams this has no influence on the write
2004          * index, but the read index might jump. */
2005         invalidate_indexes(s, TRUE, FALSE);
2006
2007     return o;
2008 }
2009
2010 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2011     pa_operation *o;
2012
2013     pa_assert(s);
2014     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2015
2016     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2017     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2018     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2019     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2020
2021     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2022         return NULL;
2023
2024     /* This might cause the read index to hang again, hence
2025      * let's request a timing update */
2026     request_auto_timing_update(s, TRUE);
2027
2028     return o;
2029 }
2030
2031 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2032     pa_operation *o;
2033
2034     pa_assert(s);
2035     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2036
2037     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2038     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2039     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2040     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2041
2042     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2043         return NULL;
2044
2045     /* This might cause the read index to start moving again, hence
2046      * let's request a timing update */
2047     request_auto_timing_update(s, TRUE);
2048
2049     return o;
2050 }
2051
2052 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2053     pa_operation *o;
2054
2055     pa_assert(s);
2056     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2057     pa_assert(name);
2058
2059     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2060     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2061     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2062
2063     if (s->context->version >= 13) {
2064         pa_proplist *p = pa_proplist_new();
2065
2066         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2067         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2068         pa_proplist_free(p);
2069     } else {
2070         pa_tagstruct *t;
2071         uint32_t tag;
2072
2073         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2074         t = pa_tagstruct_command(
2075                 s->context,
2076                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2077                 &tag);
2078         pa_tagstruct_putu32(t, s->channel);
2079         pa_tagstruct_puts(t, name);
2080         pa_pstream_send_tagstruct(s->context->pstream, t);
2081         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2082     }
2083
2084     return o;
2085 }
2086
2087 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2088     pa_usec_t usec;
2089
2090     pa_assert(s);
2091     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2092
2093     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2094     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2095     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2096     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2097     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2098     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2099
2100     if (s->smoother)
2101         usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2102     else
2103         usec = calc_time(s, FALSE);
2104
2105     /* Make sure the time runs monotonically */
2106     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2107         if (usec < s->previous_time)
2108             usec = s->previous_time;
2109         else
2110             s->previous_time = usec;
2111     }
2112
2113     if (r_usec)
2114         *r_usec = usec;
2115
2116     return 0;
2117 }
2118
2119 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2120     pa_assert(s);
2121     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2122
2123     if (negative)
2124         *negative = 0;
2125
2126     if (a >= b)
2127         return a-b;
2128     else {
2129         if (negative && s->direction == PA_STREAM_RECORD) {
2130             *negative = 1;
2131             return b-a;
2132         } else
2133             return 0;
2134     }
2135 }
2136
2137 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2138     pa_usec_t t, c;
2139     int r;
2140     int64_t cindex;
2141
2142     pa_assert(s);
2143     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2144     pa_assert(r_usec);
2145
2146     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2147     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2148     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2149     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2150     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2151     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2152
2153     if ((r = pa_stream_get_time(s, &t)) < 0)
2154         return r;
2155
2156     if (s->direction == PA_STREAM_PLAYBACK)
2157         cindex = s->timing_info.write_index;
2158     else
2159         cindex = s->timing_info.read_index;
2160
2161     if (cindex < 0)
2162         cindex = 0;
2163
2164     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2165
2166     if (s->direction == PA_STREAM_PLAYBACK)
2167         *r_usec = time_counter_diff(s, c, t, negative);
2168     else
2169         *r_usec = time_counter_diff(s, t, c, negative);
2170
2171     return 0;
2172 }
2173
2174 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2175     pa_assert(s);
2176     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2177
2178     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2179     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2180     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2181     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2182
2183     return &s->timing_info;
2184 }
2185
2186 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2187     pa_assert(s);
2188     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2189
2190     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2191
2192     return &s->sample_spec;
2193 }
2194
2195 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2196     pa_assert(s);
2197     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2198
2199     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2200
2201     return &s->channel_map;
2202 }
2203
2204 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2205     pa_assert(s);
2206     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2207
2208     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2209     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2210     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2211
2212     return &s->buffer_attr;
2213 }
2214
2215 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2216     pa_operation *o = userdata;
2217     int success = 1;
2218
2219     pa_assert(pd);
2220     pa_assert(o);
2221     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2222
2223     if (!o->context)
2224         goto finish;
2225
2226     if (command != PA_COMMAND_REPLY) {
2227         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2228             goto finish;
2229
2230         success = 0;
2231     } else {
2232         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2233             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2234                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2235                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2236                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2237                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2238                 goto finish;
2239             }
2240         } else if (o->stream->direction == PA_STREAM_RECORD) {
2241             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2242                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2243                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2244                 goto finish;
2245             }
2246         }
2247
2248         if (o->stream->context->version >= 13) {
2249             pa_usec_t usec;
2250
2251             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2252                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2253                 goto finish;
2254             }
2255
2256             if (o->stream->direction == PA_STREAM_RECORD)
2257                 o->stream->timing_info.configured_source_usec = usec;
2258             else
2259                 o->stream->timing_info.configured_sink_usec = usec;
2260         }
2261
2262         if (!pa_tagstruct_eof(t)) {
2263             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2264             goto finish;
2265         }
2266     }
2267
2268     if (o->callback) {
2269         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2270         cb(o->stream, success, o->userdata);
2271     }
2272
2273 finish:
2274     pa_operation_done(o);
2275     pa_operation_unref(o);
2276 }
2277
2278
2279 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2280     pa_operation *o;
2281     pa_tagstruct *t;
2282     uint32_t tag;
2283
2284     pa_assert(s);
2285     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2286     pa_assert(attr);
2287
2288     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2289     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2290     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2291     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2292
2293     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2294
2295     t = pa_tagstruct_command(
2296             s->context,
2297             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2298             &tag);
2299     pa_tagstruct_putu32(t, s->channel);
2300
2301     pa_tagstruct_putu32(t, attr->maxlength);
2302
2303     if (s->direction == PA_STREAM_PLAYBACK)
2304         pa_tagstruct_put(
2305                 t,
2306                 PA_TAG_U32, attr->tlength,
2307                 PA_TAG_U32, attr->prebuf,
2308                 PA_TAG_U32, attr->minreq,
2309                 PA_TAG_INVALID);
2310     else
2311         pa_tagstruct_putu32(t, attr->fragsize);
2312
2313     if (s->context->version >= 13)
2314         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2315
2316     if (s->context->version >= 14)
2317         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2318
2319     pa_pstream_send_tagstruct(s->context->pstream, t);
2320     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2321
2322     /* This might cause changes in the read/write indexex, hence let's
2323      * request a timing update */
2324     request_auto_timing_update(s, TRUE);
2325
2326     return o;
2327 }
2328
2329 uint32_t pa_stream_get_device_index(pa_stream *s) {
2330     pa_assert(s);
2331     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2332
2333     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2334     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2335     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2336     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2337     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2338
2339     return s->device_index;
2340 }
2341
2342 const char *pa_stream_get_device_name(pa_stream *s) {
2343     pa_assert(s);
2344     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2345
2346     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2347     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2348     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2349     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2350     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2351
2352     return s->device_name;
2353 }
2354
2355 int pa_stream_is_suspended(pa_stream *s) {
2356     pa_assert(s);
2357     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2358
2359     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2360     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2361     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2362     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2363
2364     return s->suspended;
2365 }
2366
2367 int pa_stream_is_corked(pa_stream *s) {
2368     pa_assert(s);
2369     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2370
2371     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2372     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2373     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2374
2375     return s->corked;
2376 }
2377
2378 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2379     pa_operation *o = userdata;
2380     int success = 1;
2381
2382     pa_assert(pd);
2383     pa_assert(o);
2384     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2385
2386     if (!o->context)
2387         goto finish;
2388
2389     if (command != PA_COMMAND_REPLY) {
2390         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2391             goto finish;
2392
2393         success = 0;
2394     } else {
2395
2396         if (!pa_tagstruct_eof(t)) {
2397             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2398             goto finish;
2399         }
2400     }
2401
2402     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2403     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2404
2405     if (o->callback) {
2406         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2407         cb(o->stream, success, o->userdata);
2408     }
2409
2410 finish:
2411     pa_operation_done(o);
2412     pa_operation_unref(o);
2413 }
2414
2415
2416 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2417     pa_operation *o;
2418     pa_tagstruct *t;
2419     uint32_t tag;
2420
2421     pa_assert(s);
2422     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2423
2424     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2425     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2426     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2427     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2428     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2429     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2430
2431     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2432     o->private = PA_UINT_TO_PTR(rate);
2433
2434     t = pa_tagstruct_command(
2435             s->context,
2436             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2437             &tag);
2438     pa_tagstruct_putu32(t, s->channel);
2439     pa_tagstruct_putu32(t, rate);
2440
2441     pa_pstream_send_tagstruct(s->context->pstream, t);
2442     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2443
2444     return o;
2445 }
2446
2447 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2448     pa_operation *o;
2449     pa_tagstruct *t;
2450     uint32_t tag;
2451
2452     pa_assert(s);
2453     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2454
2455     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2456     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2457     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2458     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2459     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2460
2461     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2462
2463     t = pa_tagstruct_command(
2464             s->context,
2465             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2466             &tag);
2467     pa_tagstruct_putu32(t, s->channel);
2468     pa_tagstruct_putu32(t, (uint32_t) mode);
2469     pa_tagstruct_put_proplist(t, p);
2470
2471     pa_pstream_send_tagstruct(s->context->pstream, t);
2472     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2473
2474     /* Please note that we don't update s->proplist here, because we
2475      * don't export that field */
2476
2477     return o;
2478 }
2479
2480 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2481     pa_operation *o;
2482     pa_tagstruct *t;
2483     uint32_t tag;
2484     const char * const*k;
2485
2486     pa_assert(s);
2487     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2488
2489     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2490     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2491     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2492     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2493     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2494
2495     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2496
2497     t = pa_tagstruct_command(
2498             s->context,
2499             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2500             &tag);
2501     pa_tagstruct_putu32(t, s->channel);
2502
2503     for (k = keys; *k; k++)
2504         pa_tagstruct_puts(t, *k);
2505
2506     pa_tagstruct_puts(t, NULL);
2507
2508     pa_pstream_send_tagstruct(s->context->pstream, t);
2509     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2510
2511     /* Please note that we don't update s->proplist here, because we
2512      * don't export that field */
2513
2514     return o;
2515 }
2516
2517 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2518     pa_assert(s);
2519     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2520
2521     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2522     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2523     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2524     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2525
2526     s->direct_on_input = sink_input_idx;
2527
2528     return 0;
2529 }
2530
2531 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2532     pa_assert(s);
2533     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2534
2535     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2536     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2537     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2538
2539     return s->direct_on_input;
2540 }