properly account for seeks in the requested_bytes counter
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "fork-detect.h"
42 #include "internal.h"
43
44 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
45 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
46
47 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_MIN_HISTORY (4)
50
51 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
52     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
53 }
54
55 static void reset_callbacks(pa_stream *s) {
56     s->read_callback = NULL;
57     s->read_userdata = NULL;
58     s->write_callback = NULL;
59     s->write_userdata = NULL;
60     s->state_callback = NULL;
61     s->state_userdata = NULL;
62     s->overflow_callback = NULL;
63     s->overflow_userdata = NULL;
64     s->underflow_callback = NULL;
65     s->underflow_userdata = NULL;
66     s->latency_update_callback = NULL;
67     s->latency_update_userdata = NULL;
68     s->moved_callback = NULL;
69     s->moved_userdata = NULL;
70     s->suspended_callback = NULL;
71     s->suspended_userdata = NULL;
72     s->started_callback = NULL;
73     s->started_userdata = NULL;
74     s->event_callback = NULL;
75     s->event_userdata = NULL;
76     s->buffer_attr_callback = NULL;
77     s->buffer_attr_userdata = NULL;
78 }
79
80 pa_stream *pa_stream_new_with_proplist(
81         pa_context *c,
82         const char *name,
83         const pa_sample_spec *ss,
84         const pa_channel_map *map,
85         pa_proplist *p) {
86
87     pa_stream *s;
88     int i;
89     pa_channel_map tmap;
90
91     pa_assert(c);
92     pa_assert(PA_REFCNT_VALUE(c) >= 1);
93
94     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
95     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
101
102     if (!map)
103         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
104
105     s = pa_xnew(pa_stream, 1);
106     PA_REFCNT_INIT(s);
107     s->context = c;
108     s->mainloop = c->mainloop;
109
110     s->direction = PA_STREAM_NODIRECTION;
111     s->state = PA_STREAM_UNCONNECTED;
112     s->flags = 0;
113
114     s->sample_spec = *ss;
115     s->channel_map = *map;
116
117     s->direct_on_input = PA_INVALID_INDEX;
118
119     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
120     if (name)
121         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
122
123     s->channel = 0;
124     s->channel_valid = FALSE;
125     s->syncid = c->csyncid++;
126     s->stream_index = PA_INVALID_INDEX;
127
128     s->requested_bytes = 0;
129     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
130
131     /* We initialize der target length here, so that if the user
132      * passes no explicit buffering metrics the default is similar to
133      * what older PA versions provided. */
134
135     s->buffer_attr.maxlength = (uint32_t) -1;
136     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
137     s->buffer_attr.minreq = (uint32_t) -1;
138     s->buffer_attr.prebuf = (uint32_t) -1;
139     s->buffer_attr.fragsize = (uint32_t) -1;
140
141     s->device_index = PA_INVALID_INDEX;
142     s->device_name = NULL;
143     s->suspended = FALSE;
144     s->corked = FALSE;
145
146     pa_memchunk_reset(&s->peek_memchunk);
147     s->peek_data = NULL;
148
149     s->record_memblockq = NULL;
150
151
152     memset(&s->timing_info, 0, sizeof(s->timing_info));
153     s->timing_info_valid = FALSE;
154
155     s->previous_time = 0;
156
157     s->read_index_not_before = 0;
158     s->write_index_not_before = 0;
159     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
160         s->write_index_corrections[i].valid = 0;
161     s->current_write_index_correction = 0;
162
163     s->auto_timing_update_event = NULL;
164     s->auto_timing_update_requested = FALSE;
165     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
166
167     reset_callbacks(s);
168
169     s->smoother = NULL;
170
171     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
172     PA_LLIST_PREPEND(pa_stream, c->streams, s);
173     pa_stream_ref(s);
174
175     return s;
176 }
177
178 static void stream_unlink(pa_stream *s) {
179     pa_operation *o, *n;
180     pa_assert(s);
181
182     if (!s->context)
183         return;
184
185     /* Detach from context */
186
187     /* Unref all operatio object that point to us */
188     for (o = s->context->operations; o; o = n) {
189         n = o->next;
190
191         if (o->stream == s)
192             pa_operation_cancel(o);
193     }
194
195     /* Drop all outstanding replies for this stream */
196     if (s->context->pdispatch)
197         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
198
199     if (s->channel_valid) {
200         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
201         s->channel = 0;
202         s->channel_valid = FALSE;
203     }
204
205     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
206     pa_stream_unref(s);
207
208     s->context = NULL;
209
210     if (s->auto_timing_update_event) {
211         pa_assert(s->mainloop);
212         s->mainloop->time_free(s->auto_timing_update_event);
213     }
214
215     reset_callbacks(s);
216 }
217
218 static void stream_free(pa_stream *s) {
219     pa_assert(s);
220
221     stream_unlink(s);
222
223     if (s->peek_memchunk.memblock) {
224         if (s->peek_data)
225             pa_memblock_release(s->peek_memchunk.memblock);
226         pa_memblock_unref(s->peek_memchunk.memblock);
227     }
228
229     if (s->record_memblockq)
230         pa_memblockq_free(s->record_memblockq);
231
232     if (s->proplist)
233         pa_proplist_free(s->proplist);
234
235     if (s->smoother)
236         pa_smoother_free(s->smoother);
237
238     pa_xfree(s->device_name);
239     pa_xfree(s);
240 }
241
242 void pa_stream_unref(pa_stream *s) {
243     pa_assert(s);
244     pa_assert(PA_REFCNT_VALUE(s) >= 1);
245
246     if (PA_REFCNT_DEC(s) <= 0)
247         stream_free(s);
248 }
249
250 pa_stream* pa_stream_ref(pa_stream *s) {
251     pa_assert(s);
252     pa_assert(PA_REFCNT_VALUE(s) >= 1);
253
254     PA_REFCNT_INC(s);
255     return s;
256 }
257
258 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
259     pa_assert(s);
260     pa_assert(PA_REFCNT_VALUE(s) >= 1);
261
262     return s->state;
263 }
264
265 pa_context* pa_stream_get_context(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->context;
270 }
271
272 uint32_t pa_stream_get_index(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
277     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
278
279     return s->stream_index;
280 }
281
282 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
283     pa_assert(s);
284     pa_assert(PA_REFCNT_VALUE(s) >= 1);
285
286     if (s->state == st)
287         return;
288
289     pa_stream_ref(s);
290
291     s->state = st;
292
293     if (s->state_callback)
294         s->state_callback(s, s->state_userdata);
295
296     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
297         stream_unlink(s);
298
299     pa_stream_unref(s);
300 }
301
302 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
303     pa_assert(s);
304     pa_assert(PA_REFCNT_VALUE(s) >= 1);
305
306     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
307         return;
308
309     if (s->state == PA_STREAM_READY &&
310         (force || !s->auto_timing_update_requested)) {
311         pa_operation *o;
312
313 /*         pa_log("Automatically requesting new timing data"); */
314
315         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
316             pa_operation_unref(o);
317             s->auto_timing_update_requested = TRUE;
318         }
319     }
320
321     if (s->auto_timing_update_event) {
322         struct timeval next;
323
324         if (force)
325             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
326
327         pa_gettimeofday(&next);
328         pa_timeval_add(&next, s->auto_timing_interval_usec);
329         s->mainloop->time_restart(s->auto_timing_update_event, &next);
330
331         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
332     }
333 }
334
335 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
336     pa_context *c = userdata;
337     pa_stream *s;
338     uint32_t channel;
339
340     pa_assert(pd);
341     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
342     pa_assert(t);
343     pa_assert(c);
344     pa_assert(PA_REFCNT_VALUE(c) >= 1);
345
346     pa_context_ref(c);
347
348     if (pa_tagstruct_getu32(t, &channel) < 0 ||
349         !pa_tagstruct_eof(t)) {
350         pa_context_fail(c, PA_ERR_PROTOCOL);
351         goto finish;
352     }
353
354     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
355         goto finish;
356
357     if (s->state != PA_STREAM_READY)
358         goto finish;
359
360     pa_context_set_error(c, PA_ERR_KILLED);
361     pa_stream_set_state(s, PA_STREAM_FAILED);
362
363 finish:
364     pa_context_unref(c);
365 }
366
367 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
368     pa_usec_t x;
369
370     pa_assert(s);
371     pa_assert(!force_start || !force_stop);
372
373     if (!s->smoother)
374         return;
375
376     x = pa_rtclock_usec();
377
378     if (s->timing_info_valid) {
379         if (aposteriori)
380             x -= s->timing_info.transport_usec;
381         else
382             x += s->timing_info.transport_usec;
383
384         if (s->direction == PA_STREAM_PLAYBACK)
385             /* it takes a while until the pause/resume is actually
386              * audible */
387             x += s->timing_info.sink_usec;
388         else
389             /* Data froma  while back will be dropped */
390             x -= s->timing_info.source_usec;
391     }
392
393     if (s->suspended || s->corked || force_stop)
394         pa_smoother_pause(s->smoother, x);
395     else if (force_start || s->buffer_attr.prebuf == 0)
396         pa_smoother_resume(s->smoother, x);
397
398     /* Please note that we have no idea if playback actually started
399      * if prebuf is non-zero! */
400 }
401
402 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
403     pa_context *c = userdata;
404     pa_stream *s;
405     uint32_t channel;
406     const char *dn;
407     pa_bool_t suspended;
408     uint32_t di;
409     pa_usec_t usec = 0;
410     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
411
412     pa_assert(pd);
413     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
414     pa_assert(t);
415     pa_assert(c);
416     pa_assert(PA_REFCNT_VALUE(c) >= 1);
417
418     pa_context_ref(c);
419
420     if (c->version < 12) {
421         pa_context_fail(c, PA_ERR_PROTOCOL);
422         goto finish;
423     }
424
425     if (pa_tagstruct_getu32(t, &channel) < 0 ||
426         pa_tagstruct_getu32(t, &di) < 0 ||
427         pa_tagstruct_gets(t, &dn) < 0 ||
428         pa_tagstruct_get_boolean(t, &suspended) < 0) {
429         pa_context_fail(c, PA_ERR_PROTOCOL);
430         goto finish;
431     }
432
433     if (c->version >= 13) {
434
435         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
436             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
437                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
438                 pa_tagstruct_get_usec(t, &usec) < 0) {
439                 pa_context_fail(c, PA_ERR_PROTOCOL);
440                 goto finish;
441             }
442         } else {
443             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
444                 pa_tagstruct_getu32(t, &tlength) < 0 ||
445                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
446                 pa_tagstruct_getu32(t, &minreq) < 0 ||
447                 pa_tagstruct_get_usec(t, &usec) < 0) {
448                 pa_context_fail(c, PA_ERR_PROTOCOL);
449                 goto finish;
450             }
451         }
452     }
453
454     if (!pa_tagstruct_eof(t)) {
455         pa_context_fail(c, PA_ERR_PROTOCOL);
456         goto finish;
457     }
458
459     if (!dn || di == PA_INVALID_INDEX) {
460         pa_context_fail(c, PA_ERR_PROTOCOL);
461         goto finish;
462     }
463
464     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
465         goto finish;
466
467     if (s->state != PA_STREAM_READY)
468         goto finish;
469
470     if (c->version >= 13) {
471         if (s->direction == PA_STREAM_RECORD)
472             s->timing_info.configured_source_usec = usec;
473         else
474             s->timing_info.configured_sink_usec = usec;
475
476         s->buffer_attr.maxlength = maxlength;
477         s->buffer_attr.fragsize = fragsize;
478         s->buffer_attr.tlength = tlength;
479         s->buffer_attr.prebuf = prebuf;
480         s->buffer_attr.minreq = minreq;
481     }
482
483     pa_xfree(s->device_name);
484     s->device_name = pa_xstrdup(dn);
485     s->device_index = di;
486
487     s->suspended = suspended;
488
489     check_smoother_status(s, TRUE, FALSE, FALSE);
490     request_auto_timing_update(s, TRUE);
491
492     if (s->moved_callback)
493         s->moved_callback(s, s->moved_userdata);
494
495 finish:
496     pa_context_unref(c);
497 }
498
499 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
500     pa_context *c = userdata;
501     pa_stream *s;
502     uint32_t channel;
503     pa_usec_t usec = 0;
504     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
505
506     pa_assert(pd);
507     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
508     pa_assert(t);
509     pa_assert(c);
510     pa_assert(PA_REFCNT_VALUE(c) >= 1);
511
512     pa_context_ref(c);
513
514     if (c->version < 15) {
515         pa_context_fail(c, PA_ERR_PROTOCOL);
516         goto finish;
517     }
518
519     if (pa_tagstruct_getu32(t, &channel) < 0) {
520         pa_context_fail(c, PA_ERR_PROTOCOL);
521         goto finish;
522     }
523
524     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
525         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
526             pa_tagstruct_getu32(t, &fragsize) < 0 ||
527             pa_tagstruct_get_usec(t, &usec) < 0) {
528             pa_context_fail(c, PA_ERR_PROTOCOL);
529             goto finish;
530         }
531     } else {
532         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
533             pa_tagstruct_getu32(t, &tlength) < 0 ||
534             pa_tagstruct_getu32(t, &prebuf) < 0 ||
535             pa_tagstruct_getu32(t, &minreq) < 0 ||
536             pa_tagstruct_get_usec(t, &usec) < 0) {
537             pa_context_fail(c, PA_ERR_PROTOCOL);
538             goto finish;
539         }
540     }
541
542     if (!pa_tagstruct_eof(t)) {
543         pa_context_fail(c, PA_ERR_PROTOCOL);
544         goto finish;
545     }
546
547     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
548         goto finish;
549
550     if (s->state != PA_STREAM_READY)
551         goto finish;
552
553     if (s->direction == PA_STREAM_RECORD)
554         s->timing_info.configured_source_usec = usec;
555     else
556         s->timing_info.configured_sink_usec = usec;
557
558     s->buffer_attr.maxlength = maxlength;
559     s->buffer_attr.fragsize = fragsize;
560     s->buffer_attr.tlength = tlength;
561     s->buffer_attr.prebuf = prebuf;
562     s->buffer_attr.minreq = minreq;
563
564     request_auto_timing_update(s, TRUE);
565
566     if (s->buffer_attr_callback)
567         s->buffer_attr_callback(s, s->buffer_attr_userdata);
568
569 finish:
570     pa_context_unref(c);
571 }
572
573 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
574     pa_context *c = userdata;
575     pa_stream *s;
576     uint32_t channel;
577     pa_bool_t suspended;
578
579     pa_assert(pd);
580     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
581     pa_assert(t);
582     pa_assert(c);
583     pa_assert(PA_REFCNT_VALUE(c) >= 1);
584
585     pa_context_ref(c);
586
587     if (c->version < 12) {
588         pa_context_fail(c, PA_ERR_PROTOCOL);
589         goto finish;
590     }
591
592     if (pa_tagstruct_getu32(t, &channel) < 0 ||
593         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
594         !pa_tagstruct_eof(t)) {
595         pa_context_fail(c, PA_ERR_PROTOCOL);
596         goto finish;
597     }
598
599     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
600         goto finish;
601
602     if (s->state != PA_STREAM_READY)
603         goto finish;
604
605     s->suspended = suspended;
606
607     check_smoother_status(s, TRUE, FALSE, FALSE);
608     request_auto_timing_update(s, TRUE);
609
610     if (s->suspended_callback)
611         s->suspended_callback(s, s->suspended_userdata);
612
613 finish:
614     pa_context_unref(c);
615 }
616
617 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
618     pa_context *c = userdata;
619     pa_stream *s;
620     uint32_t channel;
621
622     pa_assert(pd);
623     pa_assert(command == PA_COMMAND_STARTED);
624     pa_assert(t);
625     pa_assert(c);
626     pa_assert(PA_REFCNT_VALUE(c) >= 1);
627
628     pa_context_ref(c);
629
630     if (c->version < 13) {
631         pa_context_fail(c, PA_ERR_PROTOCOL);
632         goto finish;
633     }
634
635     if (pa_tagstruct_getu32(t, &channel) < 0 ||
636         !pa_tagstruct_eof(t)) {
637         pa_context_fail(c, PA_ERR_PROTOCOL);
638         goto finish;
639     }
640
641     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
642         goto finish;
643
644     if (s->state != PA_STREAM_READY)
645         goto finish;
646
647     check_smoother_status(s, TRUE, TRUE, FALSE);
648     request_auto_timing_update(s, TRUE);
649
650     if (s->started_callback)
651         s->started_callback(s, s->started_userdata);
652
653 finish:
654     pa_context_unref(c);
655 }
656
657 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
658     pa_context *c = userdata;
659     pa_stream *s;
660     uint32_t channel;
661     pa_proplist *pl = NULL;
662     const char *event;
663
664     pa_assert(pd);
665     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
666     pa_assert(t);
667     pa_assert(c);
668     pa_assert(PA_REFCNT_VALUE(c) >= 1);
669
670     pa_context_ref(c);
671
672     if (c->version < 15) {
673         pa_context_fail(c, PA_ERR_PROTOCOL);
674         goto finish;
675     }
676
677     pl = pa_proplist_new();
678
679     if (pa_tagstruct_getu32(t, &channel) < 0 ||
680         pa_tagstruct_gets(t, &event) < 0 ||
681         pa_tagstruct_get_proplist(t, pl) < 0 ||
682         !pa_tagstruct_eof(t) || !event) {
683         pa_context_fail(c, PA_ERR_PROTOCOL);
684         goto finish;
685     }
686
687     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
688         goto finish;
689
690     if (s->state != PA_STREAM_READY)
691         goto finish;
692
693     if (s->event_callback)
694         s->event_callback(s, event, pl, s->event_userdata);
695
696 finish:
697     pa_context_unref(c);
698
699     if (pl)
700         pa_proplist_free(pl);
701 }
702
703 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
704     pa_stream *s;
705     pa_context *c = userdata;
706     uint32_t bytes, channel;
707
708     pa_assert(pd);
709     pa_assert(command == PA_COMMAND_REQUEST);
710     pa_assert(t);
711     pa_assert(c);
712     pa_assert(PA_REFCNT_VALUE(c) >= 1);
713
714     pa_context_ref(c);
715
716     if (pa_tagstruct_getu32(t, &channel) < 0 ||
717         pa_tagstruct_getu32(t, &bytes) < 0 ||
718         !pa_tagstruct_eof(t)) {
719         pa_context_fail(c, PA_ERR_PROTOCOL);
720         goto finish;
721     }
722
723     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
724         goto finish;
725
726     if (s->state != PA_STREAM_READY)
727         goto finish;
728
729     s->requested_bytes += bytes;
730
731     if (s->requested_bytes > 0 && s->write_callback)
732         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
733
734 finish:
735     pa_context_unref(c);
736 }
737
738 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
739     pa_stream *s;
740     pa_context *c = userdata;
741     uint32_t channel;
742
743     pa_assert(pd);
744     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
745     pa_assert(t);
746     pa_assert(c);
747     pa_assert(PA_REFCNT_VALUE(c) >= 1);
748
749     pa_context_ref(c);
750
751     if (pa_tagstruct_getu32(t, &channel) < 0 ||
752         !pa_tagstruct_eof(t)) {
753         pa_context_fail(c, PA_ERR_PROTOCOL);
754         goto finish;
755     }
756
757     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
758         goto finish;
759
760     if (s->state != PA_STREAM_READY)
761         goto finish;
762
763     if (s->buffer_attr.prebuf > 0)
764         check_smoother_status(s, TRUE, FALSE, TRUE);
765
766     request_auto_timing_update(s, TRUE);
767
768     if (command == PA_COMMAND_OVERFLOW) {
769         if (s->overflow_callback)
770             s->overflow_callback(s, s->overflow_userdata);
771     } else if (command == PA_COMMAND_UNDERFLOW) {
772         if (s->underflow_callback)
773             s->underflow_callback(s, s->underflow_userdata);
774     }
775
776  finish:
777     pa_context_unref(c);
778 }
779
780 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
781     pa_assert(s);
782     pa_assert(PA_REFCNT_VALUE(s) >= 1);
783
784 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
785
786     if (s->state != PA_STREAM_READY)
787         return;
788
789     if (w) {
790         s->write_index_not_before = s->context->ctag;
791
792         if (s->timing_info_valid)
793             s->timing_info.write_index_corrupt = TRUE;
794
795 /*         pa_log("write_index invalidated"); */
796     }
797
798     if (r) {
799         s->read_index_not_before = s->context->ctag;
800
801         if (s->timing_info_valid)
802             s->timing_info.read_index_corrupt = TRUE;
803
804 /*         pa_log("read_index invalidated"); */
805     }
806
807     request_auto_timing_update(s, TRUE);
808 }
809
810 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
811     pa_stream *s = userdata;
812
813     pa_assert(s);
814     pa_assert(PA_REFCNT_VALUE(s) >= 1);
815
816     pa_stream_ref(s);
817     request_auto_timing_update(s, FALSE);
818     pa_stream_unref(s);
819 }
820
821 static void create_stream_complete(pa_stream *s) {
822     pa_assert(s);
823     pa_assert(PA_REFCNT_VALUE(s) >= 1);
824     pa_assert(s->state == PA_STREAM_CREATING);
825
826     pa_stream_set_state(s, PA_STREAM_READY);
827
828     if (s->requested_bytes > 0 && s->write_callback)
829         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
830
831     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
832         struct timeval tv;
833         pa_gettimeofday(&tv);
834         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
835         pa_timeval_add(&tv, s->auto_timing_interval_usec);
836         pa_assert(!s->auto_timing_update_event);
837         s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
838
839         request_auto_timing_update(s, TRUE);
840     }
841
842     check_smoother_status(s, TRUE, FALSE, FALSE);
843 }
844
845 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
846     pa_assert(s);
847     pa_assert(attr);
848     pa_assert(ss);
849
850     if (s->context->version >= 13)
851         return;
852
853     /* Version older than 0.9.10 didn't do server side buffer_attr
854      * selection, hence we have to fake it on the client side. */
855
856     /* We choose fairly conservative values here, to not confuse
857      * old clients with extremely large playback buffers */
858
859     if (attr->maxlength == (uint32_t) -1)
860         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
861
862     if (attr->tlength == (uint32_t) -1)
863         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
864
865     if (attr->minreq == (uint32_t) -1)
866         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
867
868     if (attr->prebuf == (uint32_t) -1)
869         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
870
871     if (attr->fragsize == (uint32_t) -1)
872         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
873 }
874
875 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
876     pa_stream *s = userdata;
877     uint32_t requested_bytes;
878
879     pa_assert(pd);
880     pa_assert(s);
881     pa_assert(PA_REFCNT_VALUE(s) >= 1);
882     pa_assert(s->state == PA_STREAM_CREATING);
883
884     pa_stream_ref(s);
885
886     if (command != PA_COMMAND_REPLY) {
887         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
888             goto finish;
889
890         pa_stream_set_state(s, PA_STREAM_FAILED);
891         goto finish;
892     }
893
894     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
895         s->channel == PA_INVALID_INDEX ||
896         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
897         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
898         pa_context_fail(s->context, PA_ERR_PROTOCOL);
899         goto finish;
900     }
901
902     s->requested_bytes = (int64_t) requested_bytes;
903
904     if (s->context->version >= 9) {
905         if (s->direction == PA_STREAM_PLAYBACK) {
906             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
907                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
908                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
909                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
910                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
911                 goto finish;
912             }
913         } else if (s->direction == PA_STREAM_RECORD) {
914             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
915                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
916                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
917                 goto finish;
918             }
919         }
920     }
921
922     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
923         pa_sample_spec ss;
924         pa_channel_map cm;
925         const char *dn = NULL;
926         pa_bool_t suspended;
927
928         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
929             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
930             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
931             pa_tagstruct_gets(t, &dn) < 0 ||
932             pa_tagstruct_get_boolean(t, &suspended) < 0) {
933             pa_context_fail(s->context, PA_ERR_PROTOCOL);
934             goto finish;
935         }
936
937         if (!dn || s->device_index == PA_INVALID_INDEX ||
938             ss.channels != cm.channels ||
939             !pa_channel_map_valid(&cm) ||
940             !pa_sample_spec_valid(&ss) ||
941             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
942             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
943             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
944             pa_context_fail(s->context, PA_ERR_PROTOCOL);
945             goto finish;
946         }
947
948         pa_xfree(s->device_name);
949         s->device_name = pa_xstrdup(dn);
950         s->suspended = suspended;
951
952         s->channel_map = cm;
953         s->sample_spec = ss;
954     }
955
956     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
957         pa_usec_t usec;
958
959         if (pa_tagstruct_get_usec(t, &usec) < 0) {
960             pa_context_fail(s->context, PA_ERR_PROTOCOL);
961             goto finish;
962         }
963
964         if (s->direction == PA_STREAM_RECORD)
965             s->timing_info.configured_source_usec = usec;
966         else
967             s->timing_info.configured_sink_usec = usec;
968     }
969
970     if (!pa_tagstruct_eof(t)) {
971         pa_context_fail(s->context, PA_ERR_PROTOCOL);
972         goto finish;
973     }
974
975     if (s->direction == PA_STREAM_RECORD) {
976         pa_assert(!s->record_memblockq);
977
978         s->record_memblockq = pa_memblockq_new(
979                 0,
980                 s->buffer_attr.maxlength,
981                 0,
982                 pa_frame_size(&s->sample_spec),
983                 1,
984                 0,
985                 0,
986                 NULL);
987     }
988
989     s->channel_valid = TRUE;
990     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
991
992     create_stream_complete(s);
993
994 finish:
995     pa_stream_unref(s);
996 }
997
998 static int create_stream(
999         pa_stream_direction_t direction,
1000         pa_stream *s,
1001         const char *dev,
1002         const pa_buffer_attr *attr,
1003         pa_stream_flags_t flags,
1004         const pa_cvolume *volume,
1005         pa_stream *sync_stream) {
1006
1007     pa_tagstruct *t;
1008     uint32_t tag;
1009     pa_bool_t volume_set = FALSE;
1010
1011     pa_assert(s);
1012     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1013     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1014
1015     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1016     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1017     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1018     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1019                                               PA_STREAM_INTERPOLATE_TIMING|
1020                                               PA_STREAM_NOT_MONOTONIC|
1021                                               PA_STREAM_AUTO_TIMING_UPDATE|
1022                                               PA_STREAM_NO_REMAP_CHANNELS|
1023                                               PA_STREAM_NO_REMIX_CHANNELS|
1024                                               PA_STREAM_FIX_FORMAT|
1025                                               PA_STREAM_FIX_RATE|
1026                                               PA_STREAM_FIX_CHANNELS|
1027                                               PA_STREAM_DONT_MOVE|
1028                                               PA_STREAM_VARIABLE_RATE|
1029                                               PA_STREAM_PEAK_DETECT|
1030                                               PA_STREAM_START_MUTED|
1031                                               PA_STREAM_ADJUST_LATENCY|
1032                                               PA_STREAM_EARLY_REQUESTS|
1033                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1034                                               PA_STREAM_START_UNMUTED|
1035                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1036
1037     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1038     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1039     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1040     /* Althought some of the other flags are not supported on older
1041      * version, we don't check for them here, because it doesn't hurt
1042      * when they are passed but actually not supported. This makes
1043      * client development easier */
1044
1045     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1046     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1047     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1048     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1049     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1050
1051     pa_stream_ref(s);
1052
1053     s->direction = direction;
1054     s->flags = flags;
1055     s->corked = !!(flags & PA_STREAM_START_CORKED);
1056
1057     if (sync_stream)
1058         s->syncid = sync_stream->syncid;
1059
1060     if (attr)
1061         s->buffer_attr = *attr;
1062     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1063
1064     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1065         pa_usec_t x;
1066
1067         if (s->smoother)
1068             pa_smoother_free(s->smoother);
1069
1070         s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
1071
1072         x = pa_rtclock_usec();
1073         pa_smoother_set_time_offset(s->smoother, x);
1074         pa_smoother_pause(s->smoother, x);
1075     }
1076
1077     if (!dev)
1078         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1079
1080     t = pa_tagstruct_command(
1081             s->context,
1082             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1083             &tag);
1084
1085     if (s->context->version < 13)
1086         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1087
1088     pa_tagstruct_put(
1089             t,
1090             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1091             PA_TAG_CHANNEL_MAP, &s->channel_map,
1092             PA_TAG_U32, PA_INVALID_INDEX,
1093             PA_TAG_STRING, dev,
1094             PA_TAG_U32, s->buffer_attr.maxlength,
1095             PA_TAG_BOOLEAN, s->corked,
1096             PA_TAG_INVALID);
1097
1098     if (s->direction == PA_STREAM_PLAYBACK) {
1099         pa_cvolume cv;
1100
1101         pa_tagstruct_put(
1102                 t,
1103                 PA_TAG_U32, s->buffer_attr.tlength,
1104                 PA_TAG_U32, s->buffer_attr.prebuf,
1105                 PA_TAG_U32, s->buffer_attr.minreq,
1106                 PA_TAG_U32, s->syncid,
1107                 PA_TAG_INVALID);
1108
1109         volume_set = !!volume;
1110
1111         if (!volume)
1112             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1113
1114         pa_tagstruct_put_cvolume(t, volume);
1115     } else
1116         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1117
1118     if (s->context->version >= 12) {
1119         pa_tagstruct_put(
1120                 t,
1121                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1122                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1123                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1124                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1125                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1126                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1127                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1128                 PA_TAG_INVALID);
1129     }
1130
1131     if (s->context->version >= 13) {
1132
1133         if (s->direction == PA_STREAM_PLAYBACK)
1134             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1135         else
1136             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1137
1138         pa_tagstruct_put(
1139                 t,
1140                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1141                 PA_TAG_PROPLIST, s->proplist,
1142                 PA_TAG_INVALID);
1143
1144         if (s->direction == PA_STREAM_RECORD)
1145             pa_tagstruct_putu32(t, s->direct_on_input);
1146     }
1147
1148     if (s->context->version >= 14) {
1149
1150         if (s->direction == PA_STREAM_PLAYBACK)
1151             pa_tagstruct_put_boolean(t, volume_set);
1152
1153         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1154     }
1155
1156     if (s->context->version >= 15) {
1157
1158         if (s->direction == PA_STREAM_PLAYBACK)
1159             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1160
1161         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1162         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1163     }
1164
1165     pa_pstream_send_tagstruct(s->context->pstream, t);
1166     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1167
1168     pa_stream_set_state(s, PA_STREAM_CREATING);
1169
1170     pa_stream_unref(s);
1171     return 0;
1172 }
1173
1174 int pa_stream_connect_playback(
1175         pa_stream *s,
1176         const char *dev,
1177         const pa_buffer_attr *attr,
1178         pa_stream_flags_t flags,
1179         pa_cvolume *volume,
1180         pa_stream *sync_stream) {
1181
1182     pa_assert(s);
1183     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1184
1185     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1186 }
1187
1188 int pa_stream_connect_record(
1189         pa_stream *s,
1190         const char *dev,
1191         const pa_buffer_attr *attr,
1192         pa_stream_flags_t flags) {
1193
1194     pa_assert(s);
1195     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1196
1197     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1198 }
1199
1200 int pa_stream_write(
1201         pa_stream *s,
1202         const void *data,
1203         size_t length,
1204         void (*free_cb)(void *p),
1205         int64_t offset,
1206         pa_seek_mode_t seek) {
1207
1208     pa_memchunk chunk;
1209     pa_seek_mode_t t_seek;
1210     int64_t t_offset;
1211     size_t t_length;
1212     const void *t_data;
1213
1214     pa_assert(s);
1215     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1216     pa_assert(data);
1217
1218     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1219     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1220     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1221     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1222     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1223
1224     if (length <= 0)
1225         return 0;
1226
1227     t_seek = seek;
1228     t_offset = offset;
1229     t_length = length;
1230     t_data = data;
1231
1232     while (t_length > 0) {
1233
1234         chunk.index = 0;
1235
1236         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1237             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1238             chunk.length = t_length;
1239         } else {
1240             void *d;
1241
1242             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1243             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1244
1245             d = pa_memblock_acquire(chunk.memblock);
1246             memcpy(d, t_data, chunk.length);
1247             pa_memblock_release(chunk.memblock);
1248         }
1249
1250         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1251
1252         t_offset = 0;
1253         t_seek = PA_SEEK_RELATIVE;
1254
1255         t_data = (const uint8_t*) t_data + chunk.length;
1256         t_length -= chunk.length;
1257
1258         pa_memblock_unref(chunk.memblock);
1259     }
1260
1261     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1262         free_cb((void*) data);
1263
1264     /* This is obviously wrong since we ignore the seeking index . But
1265      * that's OK, the server side applies the same error */
1266     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1267
1268     if (s->direction == PA_STREAM_PLAYBACK) {
1269
1270         /* Update latency request correction */
1271         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1272
1273             if (seek == PA_SEEK_ABSOLUTE) {
1274                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1275                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1276                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1277             } else if (seek == PA_SEEK_RELATIVE) {
1278                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1279                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1280             } else
1281                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1282         }
1283
1284         /* Update the write index in the already available latency data */
1285         if (s->timing_info_valid) {
1286
1287             if (seek == PA_SEEK_ABSOLUTE) {
1288                 s->timing_info.write_index_corrupt = FALSE;
1289                 s->timing_info.write_index = offset + (int64_t) length;
1290             } else if (seek == PA_SEEK_RELATIVE) {
1291                 if (!s->timing_info.write_index_corrupt)
1292                     s->timing_info.write_index += offset + (int64_t) length;
1293             } else
1294                 s->timing_info.write_index_corrupt = TRUE;
1295         }
1296
1297         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1298             request_auto_timing_update(s, TRUE);
1299     }
1300
1301     return 0;
1302 }
1303
1304 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1305     pa_assert(s);
1306     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1307     pa_assert(data);
1308     pa_assert(length);
1309
1310     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1311     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1312     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1313
1314     if (!s->peek_memchunk.memblock) {
1315
1316         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1317             *data = NULL;
1318             *length = 0;
1319             return 0;
1320         }
1321
1322         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1323     }
1324
1325     pa_assert(s->peek_data);
1326     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1327     *length = s->peek_memchunk.length;
1328     return 0;
1329 }
1330
1331 int pa_stream_drop(pa_stream *s) {
1332     pa_assert(s);
1333     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1334
1335     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1336     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1337     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1338     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1339
1340     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1341
1342     /* Fix the simulated local read index */
1343     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1344         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1345
1346     pa_assert(s->peek_data);
1347     pa_memblock_release(s->peek_memchunk.memblock);
1348     pa_memblock_unref(s->peek_memchunk.memblock);
1349     pa_memchunk_reset(&s->peek_memchunk);
1350
1351     return 0;
1352 }
1353
1354 size_t pa_stream_writable_size(pa_stream *s) {
1355     pa_assert(s);
1356     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1357
1358     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1359     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1360     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1361
1362     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1363 }
1364
1365 size_t pa_stream_readable_size(pa_stream *s) {
1366     pa_assert(s);
1367     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1368
1369     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1370     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1371     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1372
1373     return pa_memblockq_get_length(s->record_memblockq);
1374 }
1375
1376 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1377     pa_operation *o;
1378     pa_tagstruct *t;
1379     uint32_t tag;
1380
1381     pa_assert(s);
1382     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1383
1384     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1385     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1386     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1387
1388     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1389
1390     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1391     pa_tagstruct_putu32(t, s->channel);
1392     pa_pstream_send_tagstruct(s->context->pstream, t);
1393     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1394
1395     return o;
1396 }
1397
1398 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1399     pa_usec_t usec;
1400
1401     pa_assert(s);
1402     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1403     pa_assert(s->state == PA_STREAM_READY);
1404     pa_assert(s->direction != PA_STREAM_UPLOAD);
1405     pa_assert(s->timing_info_valid);
1406     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1407     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1408
1409     if (s->direction == PA_STREAM_PLAYBACK) {
1410         /* The last byte that was written into the output device
1411          * had this time value associated */
1412         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1413
1414         if (!s->corked && !s->suspended) {
1415
1416             if (!ignore_transport)
1417                 /* Because the latency info took a little time to come
1418                  * to us, we assume that the real output time is actually
1419                  * a little ahead */
1420                 usec += s->timing_info.transport_usec;
1421
1422             /* However, the output device usually maintains a buffer
1423                too, hence the real sample currently played is a little
1424                back  */
1425             if (s->timing_info.sink_usec >= usec)
1426                 usec = 0;
1427             else
1428                 usec -= s->timing_info.sink_usec;
1429         }
1430
1431     } else {
1432         pa_assert(s->direction == PA_STREAM_RECORD);
1433
1434         /* The last byte written into the server side queue had
1435          * this time value associated */
1436         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1437
1438         if (!s->corked && !s->suspended) {
1439
1440             if (!ignore_transport)
1441                 /* Add transport latency */
1442                 usec += s->timing_info.transport_usec;
1443
1444             /* Add latency of data in device buffer */
1445             usec += s->timing_info.source_usec;
1446
1447             /* If this is a monitor source, we need to correct the
1448              * time by the playback device buffer */
1449             if (s->timing_info.sink_usec >= usec)
1450                 usec = 0;
1451             else
1452                 usec -= s->timing_info.sink_usec;
1453         }
1454     }
1455
1456     return usec;
1457 }
1458
1459 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1460     pa_operation *o = userdata;
1461     struct timeval local, remote, now;
1462     pa_timing_info *i;
1463     pa_bool_t playing = FALSE;
1464     uint64_t underrun_for = 0, playing_for = 0;
1465
1466     pa_assert(pd);
1467     pa_assert(o);
1468     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1469
1470     if (!o->context || !o->stream)
1471         goto finish;
1472
1473     i = &o->stream->timing_info;
1474
1475     o->stream->timing_info_valid = FALSE;
1476     i->write_index_corrupt = TRUE;
1477     i->read_index_corrupt = TRUE;
1478
1479     if (command != PA_COMMAND_REPLY) {
1480         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1481             goto finish;
1482
1483     } else {
1484
1485         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1486             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1487             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1488             pa_tagstruct_get_timeval(t, &local) < 0 ||
1489             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1490             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1491             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1492
1493             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1494             goto finish;
1495         }
1496
1497         if (o->context->version >= 13 &&
1498             o->stream->direction == PA_STREAM_PLAYBACK)
1499             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1500                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1501
1502                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1503                 goto finish;
1504             }
1505
1506
1507         if (!pa_tagstruct_eof(t)) {
1508             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1509             goto finish;
1510         }
1511         o->stream->timing_info_valid = TRUE;
1512         i->write_index_corrupt = FALSE;
1513         i->read_index_corrupt = FALSE;
1514
1515         i->playing = (int) playing;
1516         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1517
1518         pa_gettimeofday(&now);
1519
1520         /* Calculcate timestamps */
1521         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1522             /* local and remote seem to have synchronized clocks */
1523
1524             if (o->stream->direction == PA_STREAM_PLAYBACK)
1525                 i->transport_usec = pa_timeval_diff(&remote, &local);
1526             else
1527                 i->transport_usec = pa_timeval_diff(&now, &remote);
1528
1529             i->synchronized_clocks = TRUE;
1530             i->timestamp = remote;
1531         } else {
1532             /* clocks are not synchronized, let's estimate latency then */
1533             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1534             i->synchronized_clocks = FALSE;
1535             i->timestamp = local;
1536             pa_timeval_add(&i->timestamp, i->transport_usec);
1537         }
1538
1539         /* Invalidate read and write indexes if necessary */
1540         if (tag < o->stream->read_index_not_before)
1541             i->read_index_corrupt = TRUE;
1542
1543         if (tag < o->stream->write_index_not_before)
1544             i->write_index_corrupt = TRUE;
1545
1546         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1547             /* Write index correction */
1548
1549             int n, j;
1550             uint32_t ctag = tag;
1551
1552             /* Go through the saved correction values and add up the
1553              * total correction.*/
1554             for (n = 0, j = o->stream->current_write_index_correction+1;
1555                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1556                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1557
1558                 /* Step over invalid data or out-of-date data */
1559                 if (!o->stream->write_index_corrections[j].valid ||
1560                     o->stream->write_index_corrections[j].tag < ctag)
1561                     continue;
1562
1563                 /* Make sure that everything is in order */
1564                 ctag = o->stream->write_index_corrections[j].tag+1;
1565
1566                 /* Now fix the write index */
1567                 if (o->stream->write_index_corrections[j].corrupt) {
1568                     /* A corrupting seek was made */
1569                     i->write_index_corrupt = TRUE;
1570                 } else if (o->stream->write_index_corrections[j].absolute) {
1571                     /* An absolute seek was made */
1572                     i->write_index = o->stream->write_index_corrections[j].value;
1573                     i->write_index_corrupt = FALSE;
1574                 } else if (!i->write_index_corrupt) {
1575                     /* A relative seek was made */
1576                     i->write_index += o->stream->write_index_corrections[j].value;
1577                 }
1578             }
1579
1580             /* Clear old correction entries */
1581             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1582                 if (!o->stream->write_index_corrections[n].valid)
1583                     continue;
1584
1585                 if (o->stream->write_index_corrections[n].tag <= tag)
1586                     o->stream->write_index_corrections[n].valid = FALSE;
1587             }
1588         }
1589
1590         if (o->stream->direction == PA_STREAM_RECORD) {
1591             /* Read index correction */
1592
1593             if (!i->read_index_corrupt)
1594                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1595         }
1596
1597         /* Update smoother */
1598         if (o->stream->smoother) {
1599             pa_usec_t u, x;
1600
1601             u = x = pa_rtclock_usec() - i->transport_usec;
1602
1603             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1604                 pa_usec_t su;
1605
1606                 /* If we weren't playing then it will take some time
1607                  * until the audio will actually come out through the
1608                  * speakers. Since we follow that timing here, we need
1609                  * to try to fix this up */
1610
1611                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1612
1613                 if (su < i->sink_usec)
1614                     x += i->sink_usec - su;
1615             }
1616
1617             if (!i->playing)
1618                 pa_smoother_pause(o->stream->smoother, x);
1619
1620             /* Update the smoother */
1621             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1622                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1623                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1624
1625             if (i->playing)
1626                 pa_smoother_resume(o->stream->smoother, x);
1627         }
1628     }
1629
1630     o->stream->auto_timing_update_requested = FALSE;
1631
1632     if (o->stream->latency_update_callback)
1633         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1634
1635     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1636         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1637         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1638     }
1639
1640 finish:
1641
1642     pa_operation_done(o);
1643     pa_operation_unref(o);
1644 }
1645
1646 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1647     uint32_t tag;
1648     pa_operation *o;
1649     pa_tagstruct *t;
1650     struct timeval now;
1651     int cidx = 0;
1652
1653     pa_assert(s);
1654     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1655
1656     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1657     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1658     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1659
1660     if (s->direction == PA_STREAM_PLAYBACK) {
1661         /* Find a place to store the write_index correction data for this entry */
1662         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1663
1664         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1665         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1666     }
1667     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1668
1669     t = pa_tagstruct_command(
1670             s->context,
1671             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1672             &tag);
1673     pa_tagstruct_putu32(t, s->channel);
1674     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1675
1676     pa_pstream_send_tagstruct(s->context->pstream, t);
1677     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1678
1679     if (s->direction == PA_STREAM_PLAYBACK) {
1680         /* Fill in initial correction data */
1681
1682         s->current_write_index_correction = cidx;
1683
1684         s->write_index_corrections[cidx].valid = TRUE;
1685         s->write_index_corrections[cidx].absolute = FALSE;
1686         s->write_index_corrections[cidx].corrupt = FALSE;
1687         s->write_index_corrections[cidx].tag = tag;
1688         s->write_index_corrections[cidx].value = 0;
1689     }
1690
1691     return o;
1692 }
1693
1694 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1695     pa_stream *s = userdata;
1696
1697     pa_assert(pd);
1698     pa_assert(s);
1699     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1700
1701     pa_stream_ref(s);
1702
1703     if (command != PA_COMMAND_REPLY) {
1704         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1705             goto finish;
1706
1707         pa_stream_set_state(s, PA_STREAM_FAILED);
1708         goto finish;
1709     } else if (!pa_tagstruct_eof(t)) {
1710         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1711         goto finish;
1712     }
1713
1714     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1715
1716 finish:
1717     pa_stream_unref(s);
1718 }
1719
1720 int pa_stream_disconnect(pa_stream *s) {
1721     pa_tagstruct *t;
1722     uint32_t tag;
1723
1724     pa_assert(s);
1725     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1726
1727     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1728     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1729     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1730
1731     pa_stream_ref(s);
1732
1733     t = pa_tagstruct_command(
1734             s->context,
1735             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1736                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1737             &tag);
1738     pa_tagstruct_putu32(t, s->channel);
1739     pa_pstream_send_tagstruct(s->context->pstream, t);
1740     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1741
1742     pa_stream_unref(s);
1743     return 0;
1744 }
1745
1746 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1747     pa_assert(s);
1748     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1749
1750     if (pa_detect_fork())
1751         return;
1752
1753     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1754         return;
1755
1756     s->read_callback = cb;
1757     s->read_userdata = userdata;
1758 }
1759
1760 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1761     pa_assert(s);
1762     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1763
1764     if (pa_detect_fork())
1765         return;
1766
1767     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1768         return;
1769
1770     s->write_callback = cb;
1771     s->write_userdata = userdata;
1772 }
1773
1774 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1775     pa_assert(s);
1776     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1777
1778     if (pa_detect_fork())
1779         return;
1780
1781     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1782         return;
1783
1784     s->state_callback = cb;
1785     s->state_userdata = userdata;
1786 }
1787
1788 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1789     pa_assert(s);
1790     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1791
1792     if (pa_detect_fork())
1793         return;
1794
1795     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1796         return;
1797
1798     s->overflow_callback = cb;
1799     s->overflow_userdata = userdata;
1800 }
1801
1802 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1803     pa_assert(s);
1804     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1805
1806     if (pa_detect_fork())
1807         return;
1808
1809     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1810         return;
1811
1812     s->underflow_callback = cb;
1813     s->underflow_userdata = userdata;
1814 }
1815
1816 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1817     pa_assert(s);
1818     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1819
1820     if (pa_detect_fork())
1821         return;
1822
1823     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1824         return;
1825
1826     s->latency_update_callback = cb;
1827     s->latency_update_userdata = userdata;
1828 }
1829
1830 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1831     pa_assert(s);
1832     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1833
1834     if (pa_detect_fork())
1835         return;
1836
1837     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1838         return;
1839
1840     s->moved_callback = cb;
1841     s->moved_userdata = userdata;
1842 }
1843
1844 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1845     pa_assert(s);
1846     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1847
1848     if (pa_detect_fork())
1849         return;
1850
1851     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1852         return;
1853
1854     s->suspended_callback = cb;
1855     s->suspended_userdata = userdata;
1856 }
1857
1858 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1859     pa_assert(s);
1860     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1861
1862     if (pa_detect_fork())
1863         return;
1864
1865     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1866         return;
1867
1868     s->started_callback = cb;
1869     s->started_userdata = userdata;
1870 }
1871
1872 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1873     pa_assert(s);
1874     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1875
1876     if (pa_detect_fork())
1877         return;
1878
1879     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1880         return;
1881
1882     s->event_callback = cb;
1883     s->event_userdata = userdata;
1884 }
1885
1886 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1887     pa_assert(s);
1888     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1889
1890     if (pa_detect_fork())
1891         return;
1892
1893     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1894         return;
1895
1896     s->buffer_attr_callback = cb;
1897     s->buffer_attr_userdata = userdata;
1898 }
1899
1900 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1901     pa_operation *o = userdata;
1902     int success = 1;
1903
1904     pa_assert(pd);
1905     pa_assert(o);
1906     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1907
1908     if (!o->context)
1909         goto finish;
1910
1911     if (command != PA_COMMAND_REPLY) {
1912         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1913             goto finish;
1914
1915         success = 0;
1916     } else if (!pa_tagstruct_eof(t)) {
1917         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1918         goto finish;
1919     }
1920
1921     if (o->callback) {
1922         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1923         cb(o->stream, success, o->userdata);
1924     }
1925
1926 finish:
1927     pa_operation_done(o);
1928     pa_operation_unref(o);
1929 }
1930
1931 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1932     pa_operation *o;
1933     pa_tagstruct *t;
1934     uint32_t tag;
1935
1936     pa_assert(s);
1937     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1938
1939     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1940     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1941     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1942
1943     s->corked = b;
1944
1945     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1946
1947     t = pa_tagstruct_command(
1948             s->context,
1949             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1950             &tag);
1951     pa_tagstruct_putu32(t, s->channel);
1952     pa_tagstruct_put_boolean(t, !!b);
1953     pa_pstream_send_tagstruct(s->context->pstream, t);
1954     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1955
1956     check_smoother_status(s, FALSE, FALSE, FALSE);
1957
1958     /* This might cause the indexes to hang/start again, hence
1959      * let's request a timing update */
1960     request_auto_timing_update(s, TRUE);
1961
1962     return o;
1963 }
1964
1965 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1966     pa_tagstruct *t;
1967     pa_operation *o;
1968     uint32_t tag;
1969
1970     pa_assert(s);
1971     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1972
1973     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1974     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1975
1976     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1977
1978     t = pa_tagstruct_command(s->context, command, &tag);
1979     pa_tagstruct_putu32(t, s->channel);
1980     pa_pstream_send_tagstruct(s->context->pstream, t);
1981     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1982
1983     return o;
1984 }
1985
1986 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1987     pa_operation *o;
1988
1989     pa_assert(s);
1990     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1991
1992     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1993     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1994     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1995
1996     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1997         return NULL;
1998
1999     if (s->direction == PA_STREAM_PLAYBACK) {
2000
2001         if (s->write_index_corrections[s->current_write_index_correction].valid)
2002             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2003
2004         if (s->buffer_attr.prebuf > 0)
2005             check_smoother_status(s, FALSE, FALSE, TRUE);
2006
2007         /* This will change the write index, but leave the
2008          * read index untouched. */
2009         invalidate_indexes(s, FALSE, TRUE);
2010
2011     } else
2012         /* For record streams this has no influence on the write
2013          * index, but the read index might jump. */
2014         invalidate_indexes(s, TRUE, FALSE);
2015
2016     return o;
2017 }
2018
2019 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2020     pa_operation *o;
2021
2022     pa_assert(s);
2023     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2024
2025     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2026     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2027     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2028     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2029
2030     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2031         return NULL;
2032
2033     /* This might cause the read index to hang again, hence
2034      * let's request a timing update */
2035     request_auto_timing_update(s, TRUE);
2036
2037     return o;
2038 }
2039
2040 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2041     pa_operation *o;
2042
2043     pa_assert(s);
2044     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2045
2046     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2047     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2048     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2049     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2050
2051     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2052         return NULL;
2053
2054     /* This might cause the read index to start moving again, hence
2055      * let's request a timing update */
2056     request_auto_timing_update(s, TRUE);
2057
2058     return o;
2059 }
2060
2061 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2062     pa_operation *o;
2063
2064     pa_assert(s);
2065     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2066     pa_assert(name);
2067
2068     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2069     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2070     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2071
2072     if (s->context->version >= 13) {
2073         pa_proplist *p = pa_proplist_new();
2074
2075         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2076         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2077         pa_proplist_free(p);
2078     } else {
2079         pa_tagstruct *t;
2080         uint32_t tag;
2081
2082         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2083         t = pa_tagstruct_command(
2084                 s->context,
2085                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2086                 &tag);
2087         pa_tagstruct_putu32(t, s->channel);
2088         pa_tagstruct_puts(t, name);
2089         pa_pstream_send_tagstruct(s->context->pstream, t);
2090         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2091     }
2092
2093     return o;
2094 }
2095
2096 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2097     pa_usec_t usec;
2098
2099     pa_assert(s);
2100     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2101
2102     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2103     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2104     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2105     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2106     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2107     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2108
2109     if (s->smoother)
2110         usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2111     else
2112         usec = calc_time(s, FALSE);
2113
2114     /* Make sure the time runs monotonically */
2115     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2116         if (usec < s->previous_time)
2117             usec = s->previous_time;
2118         else
2119             s->previous_time = usec;
2120     }
2121
2122     if (r_usec)
2123         *r_usec = usec;
2124
2125     return 0;
2126 }
2127
2128 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2129     pa_assert(s);
2130     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2131
2132     if (negative)
2133         *negative = 0;
2134
2135     if (a >= b)
2136         return a-b;
2137     else {
2138         if (negative && s->direction == PA_STREAM_RECORD) {
2139             *negative = 1;
2140             return b-a;
2141         } else
2142             return 0;
2143     }
2144 }
2145
2146 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2147     pa_usec_t t, c;
2148     int r;
2149     int64_t cindex;
2150
2151     pa_assert(s);
2152     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2153     pa_assert(r_usec);
2154
2155     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2156     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2157     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2158     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2159     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2160     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2161
2162     if ((r = pa_stream_get_time(s, &t)) < 0)
2163         return r;
2164
2165     if (s->direction == PA_STREAM_PLAYBACK)
2166         cindex = s->timing_info.write_index;
2167     else
2168         cindex = s->timing_info.read_index;
2169
2170     if (cindex < 0)
2171         cindex = 0;
2172
2173     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2174
2175     if (s->direction == PA_STREAM_PLAYBACK)
2176         *r_usec = time_counter_diff(s, c, t, negative);
2177     else
2178         *r_usec = time_counter_diff(s, t, c, negative);
2179
2180     return 0;
2181 }
2182
2183 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2184     pa_assert(s);
2185     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2186
2187     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2188     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2189     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2190     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2191
2192     return &s->timing_info;
2193 }
2194
2195 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2196     pa_assert(s);
2197     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2198
2199     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2200
2201     return &s->sample_spec;
2202 }
2203
2204 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2205     pa_assert(s);
2206     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2207
2208     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2209
2210     return &s->channel_map;
2211 }
2212
2213 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2214     pa_assert(s);
2215     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2216
2217     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2218     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2219     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2220
2221     return &s->buffer_attr;
2222 }
2223
2224 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2225     pa_operation *o = userdata;
2226     int success = 1;
2227
2228     pa_assert(pd);
2229     pa_assert(o);
2230     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2231
2232     if (!o->context)
2233         goto finish;
2234
2235     if (command != PA_COMMAND_REPLY) {
2236         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2237             goto finish;
2238
2239         success = 0;
2240     } else {
2241         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2242             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2243                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2244                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2245                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2246                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2247                 goto finish;
2248             }
2249         } else if (o->stream->direction == PA_STREAM_RECORD) {
2250             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2251                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2252                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2253                 goto finish;
2254             }
2255         }
2256
2257         if (o->stream->context->version >= 13) {
2258             pa_usec_t usec;
2259
2260             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2261                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2262                 goto finish;
2263             }
2264
2265             if (o->stream->direction == PA_STREAM_RECORD)
2266                 o->stream->timing_info.configured_source_usec = usec;
2267             else
2268                 o->stream->timing_info.configured_sink_usec = usec;
2269         }
2270
2271         if (!pa_tagstruct_eof(t)) {
2272             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2273             goto finish;
2274         }
2275     }
2276
2277     if (o->callback) {
2278         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2279         cb(o->stream, success, o->userdata);
2280     }
2281
2282 finish:
2283     pa_operation_done(o);
2284     pa_operation_unref(o);
2285 }
2286
2287
2288 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2289     pa_operation *o;
2290     pa_tagstruct *t;
2291     uint32_t tag;
2292
2293     pa_assert(s);
2294     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2295     pa_assert(attr);
2296
2297     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2298     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2299     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2300     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2301
2302     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2303
2304     t = pa_tagstruct_command(
2305             s->context,
2306             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2307             &tag);
2308     pa_tagstruct_putu32(t, s->channel);
2309
2310     pa_tagstruct_putu32(t, attr->maxlength);
2311
2312     if (s->direction == PA_STREAM_PLAYBACK)
2313         pa_tagstruct_put(
2314                 t,
2315                 PA_TAG_U32, attr->tlength,
2316                 PA_TAG_U32, attr->prebuf,
2317                 PA_TAG_U32, attr->minreq,
2318                 PA_TAG_INVALID);
2319     else
2320         pa_tagstruct_putu32(t, attr->fragsize);
2321
2322     if (s->context->version >= 13)
2323         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2324
2325     if (s->context->version >= 14)
2326         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2327
2328     pa_pstream_send_tagstruct(s->context->pstream, t);
2329     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2330
2331     /* This might cause changes in the read/write indexex, hence let's
2332      * request a timing update */
2333     request_auto_timing_update(s, TRUE);
2334
2335     return o;
2336 }
2337
2338 uint32_t pa_stream_get_device_index(pa_stream *s) {
2339     pa_assert(s);
2340     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2341
2342     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2343     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2344     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2345     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2346     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2347
2348     return s->device_index;
2349 }
2350
2351 const char *pa_stream_get_device_name(pa_stream *s) {
2352     pa_assert(s);
2353     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2354
2355     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2356     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2357     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2358     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2359     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2360
2361     return s->device_name;
2362 }
2363
2364 int pa_stream_is_suspended(pa_stream *s) {
2365     pa_assert(s);
2366     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2367
2368     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2369     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2370     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2371     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2372
2373     return s->suspended;
2374 }
2375
2376 int pa_stream_is_corked(pa_stream *s) {
2377     pa_assert(s);
2378     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2379
2380     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2381     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2382     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2383
2384     return s->corked;
2385 }
2386
2387 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2388     pa_operation *o = userdata;
2389     int success = 1;
2390
2391     pa_assert(pd);
2392     pa_assert(o);
2393     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2394
2395     if (!o->context)
2396         goto finish;
2397
2398     if (command != PA_COMMAND_REPLY) {
2399         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2400             goto finish;
2401
2402         success = 0;
2403     } else {
2404
2405         if (!pa_tagstruct_eof(t)) {
2406             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2407             goto finish;
2408         }
2409     }
2410
2411     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2412     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2413
2414     if (o->callback) {
2415         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2416         cb(o->stream, success, o->userdata);
2417     }
2418
2419 finish:
2420     pa_operation_done(o);
2421     pa_operation_unref(o);
2422 }
2423
2424
2425 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2426     pa_operation *o;
2427     pa_tagstruct *t;
2428     uint32_t tag;
2429
2430     pa_assert(s);
2431     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2432
2433     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2434     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2435     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2436     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2437     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2438     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2439
2440     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2441     o->private = PA_UINT_TO_PTR(rate);
2442
2443     t = pa_tagstruct_command(
2444             s->context,
2445             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2446             &tag);
2447     pa_tagstruct_putu32(t, s->channel);
2448     pa_tagstruct_putu32(t, rate);
2449
2450     pa_pstream_send_tagstruct(s->context->pstream, t);
2451     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2452
2453     return o;
2454 }
2455
2456 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2457     pa_operation *o;
2458     pa_tagstruct *t;
2459     uint32_t tag;
2460
2461     pa_assert(s);
2462     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2463
2464     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2465     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2466     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2467     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2468     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2469
2470     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2471
2472     t = pa_tagstruct_command(
2473             s->context,
2474             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2475             &tag);
2476     pa_tagstruct_putu32(t, s->channel);
2477     pa_tagstruct_putu32(t, (uint32_t) mode);
2478     pa_tagstruct_put_proplist(t, p);
2479
2480     pa_pstream_send_tagstruct(s->context->pstream, t);
2481     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2482
2483     /* Please note that we don't update s->proplist here, because we
2484      * don't export that field */
2485
2486     return o;
2487 }
2488
2489 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2490     pa_operation *o;
2491     pa_tagstruct *t;
2492     uint32_t tag;
2493     const char * const*k;
2494
2495     pa_assert(s);
2496     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2497
2498     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2499     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2500     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2501     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2502     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2503
2504     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2505
2506     t = pa_tagstruct_command(
2507             s->context,
2508             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2509             &tag);
2510     pa_tagstruct_putu32(t, s->channel);
2511
2512     for (k = keys; *k; k++)
2513         pa_tagstruct_puts(t, *k);
2514
2515     pa_tagstruct_puts(t, NULL);
2516
2517     pa_pstream_send_tagstruct(s->context->pstream, t);
2518     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2519
2520     /* Please note that we don't update s->proplist here, because we
2521      * don't export that field */
2522
2523     return o;
2524 }
2525
2526 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2527     pa_assert(s);
2528     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2529
2530     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2531     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2532     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2533     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2534
2535     s->direct_on_input = sink_input_idx;
2536
2537     return 0;
2538 }
2539
2540 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2541     pa_assert(s);
2542     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2543
2544     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2545     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2546     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2547
2548     return s->direct_on_input;
2549 }