Modify smoothing code to make cubic interpolation optional and allow 'quick fixups...
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "fork-detect.h"
42 #include "internal.h"
43
44 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
45 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
46
47 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_MIN_HISTORY (4)
50
51 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
52     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
53 }
54
55 static void reset_callbacks(pa_stream *s) {
56     s->read_callback = NULL;
57     s->read_userdata = NULL;
58     s->write_callback = NULL;
59     s->write_userdata = NULL;
60     s->state_callback = NULL;
61     s->state_userdata = NULL;
62     s->overflow_callback = NULL;
63     s->overflow_userdata = NULL;
64     s->underflow_callback = NULL;
65     s->underflow_userdata = NULL;
66     s->latency_update_callback = NULL;
67     s->latency_update_userdata = NULL;
68     s->moved_callback = NULL;
69     s->moved_userdata = NULL;
70     s->suspended_callback = NULL;
71     s->suspended_userdata = NULL;
72     s->started_callback = NULL;
73     s->started_userdata = NULL;
74     s->event_callback = NULL;
75     s->event_userdata = NULL;
76     s->buffer_attr_callback = NULL;
77     s->buffer_attr_userdata = NULL;
78 }
79
80 pa_stream *pa_stream_new_with_proplist(
81         pa_context *c,
82         const char *name,
83         const pa_sample_spec *ss,
84         const pa_channel_map *map,
85         pa_proplist *p) {
86
87     pa_stream *s;
88     int i;
89     pa_channel_map tmap;
90
91     pa_assert(c);
92     pa_assert(PA_REFCNT_VALUE(c) >= 1);
93
94     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
95     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
101
102     if (!map)
103         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
104
105     s = pa_xnew(pa_stream, 1);
106     PA_REFCNT_INIT(s);
107     s->context = c;
108     s->mainloop = c->mainloop;
109
110     s->direction = PA_STREAM_NODIRECTION;
111     s->state = PA_STREAM_UNCONNECTED;
112     s->flags = 0;
113
114     s->sample_spec = *ss;
115     s->channel_map = *map;
116
117     s->direct_on_input = PA_INVALID_INDEX;
118
119     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
120     if (name)
121         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
122
123     s->channel = 0;
124     s->channel_valid = FALSE;
125     s->syncid = c->csyncid++;
126     s->stream_index = PA_INVALID_INDEX;
127
128     s->requested_bytes = 0;
129     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
130
131     /* We initialize der target length here, so that if the user
132      * passes no explicit buffering metrics the default is similar to
133      * what older PA versions provided. */
134
135     s->buffer_attr.maxlength = (uint32_t) -1;
136     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
137     s->buffer_attr.minreq = (uint32_t) -1;
138     s->buffer_attr.prebuf = (uint32_t) -1;
139     s->buffer_attr.fragsize = (uint32_t) -1;
140
141     s->device_index = PA_INVALID_INDEX;
142     s->device_name = NULL;
143     s->suspended = FALSE;
144     s->corked = FALSE;
145
146     pa_memchunk_reset(&s->peek_memchunk);
147     s->peek_data = NULL;
148
149     s->record_memblockq = NULL;
150
151
152     memset(&s->timing_info, 0, sizeof(s->timing_info));
153     s->timing_info_valid = FALSE;
154
155     s->previous_time = 0;
156
157     s->read_index_not_before = 0;
158     s->write_index_not_before = 0;
159     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
160         s->write_index_corrections[i].valid = 0;
161     s->current_write_index_correction = 0;
162
163     s->auto_timing_update_event = NULL;
164     s->auto_timing_update_requested = FALSE;
165     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
166
167     reset_callbacks(s);
168
169     s->smoother = NULL;
170
171     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
172     PA_LLIST_PREPEND(pa_stream, c->streams, s);
173     pa_stream_ref(s);
174
175     return s;
176 }
177
178 static void stream_unlink(pa_stream *s) {
179     pa_operation *o, *n;
180     pa_assert(s);
181
182     if (!s->context)
183         return;
184
185     /* Detach from context */
186
187     /* Unref all operatio object that point to us */
188     for (o = s->context->operations; o; o = n) {
189         n = o->next;
190
191         if (o->stream == s)
192             pa_operation_cancel(o);
193     }
194
195     /* Drop all outstanding replies for this stream */
196     if (s->context->pdispatch)
197         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
198
199     if (s->channel_valid) {
200         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
201         s->channel = 0;
202         s->channel_valid = FALSE;
203     }
204
205     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
206     pa_stream_unref(s);
207
208     s->context = NULL;
209
210     if (s->auto_timing_update_event) {
211         pa_assert(s->mainloop);
212         s->mainloop->time_free(s->auto_timing_update_event);
213     }
214
215     reset_callbacks(s);
216 }
217
218 static void stream_free(pa_stream *s) {
219     pa_assert(s);
220
221     stream_unlink(s);
222
223     if (s->peek_memchunk.memblock) {
224         if (s->peek_data)
225             pa_memblock_release(s->peek_memchunk.memblock);
226         pa_memblock_unref(s->peek_memchunk.memblock);
227     }
228
229     if (s->record_memblockq)
230         pa_memblockq_free(s->record_memblockq);
231
232     if (s->proplist)
233         pa_proplist_free(s->proplist);
234
235     if (s->smoother)
236         pa_smoother_free(s->smoother);
237
238     pa_xfree(s->device_name);
239     pa_xfree(s);
240 }
241
242 void pa_stream_unref(pa_stream *s) {
243     pa_assert(s);
244     pa_assert(PA_REFCNT_VALUE(s) >= 1);
245
246     if (PA_REFCNT_DEC(s) <= 0)
247         stream_free(s);
248 }
249
250 pa_stream* pa_stream_ref(pa_stream *s) {
251     pa_assert(s);
252     pa_assert(PA_REFCNT_VALUE(s) >= 1);
253
254     PA_REFCNT_INC(s);
255     return s;
256 }
257
258 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
259     pa_assert(s);
260     pa_assert(PA_REFCNT_VALUE(s) >= 1);
261
262     return s->state;
263 }
264
265 pa_context* pa_stream_get_context(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->context;
270 }
271
272 uint32_t pa_stream_get_index(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
277     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
278
279     return s->stream_index;
280 }
281
282 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
283     pa_assert(s);
284     pa_assert(PA_REFCNT_VALUE(s) >= 1);
285
286     if (s->state == st)
287         return;
288
289     pa_stream_ref(s);
290
291     s->state = st;
292
293     if (s->state_callback)
294         s->state_callback(s, s->state_userdata);
295
296     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
297         stream_unlink(s);
298
299     pa_stream_unref(s);
300 }
301
302 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
303     pa_assert(s);
304     pa_assert(PA_REFCNT_VALUE(s) >= 1);
305
306     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
307         return;
308
309     if (s->state == PA_STREAM_READY &&
310         (force || !s->auto_timing_update_requested)) {
311         pa_operation *o;
312
313 /*         pa_log("Automatically requesting new timing data"); */
314
315         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
316             pa_operation_unref(o);
317             s->auto_timing_update_requested = TRUE;
318         }
319     }
320
321     if (s->auto_timing_update_event) {
322         struct timeval next;
323
324         if (force)
325             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
326
327         pa_gettimeofday(&next);
328         pa_timeval_add(&next, s->auto_timing_interval_usec);
329         s->mainloop->time_restart(s->auto_timing_update_event, &next);
330
331         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
332     }
333 }
334
335 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
336     pa_context *c = userdata;
337     pa_stream *s;
338     uint32_t channel;
339
340     pa_assert(pd);
341     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
342     pa_assert(t);
343     pa_assert(c);
344     pa_assert(PA_REFCNT_VALUE(c) >= 1);
345
346     pa_context_ref(c);
347
348     if (pa_tagstruct_getu32(t, &channel) < 0 ||
349         !pa_tagstruct_eof(t)) {
350         pa_context_fail(c, PA_ERR_PROTOCOL);
351         goto finish;
352     }
353
354     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
355         goto finish;
356
357     if (s->state != PA_STREAM_READY)
358         goto finish;
359
360     pa_context_set_error(c, PA_ERR_KILLED);
361     pa_stream_set_state(s, PA_STREAM_FAILED);
362
363 finish:
364     pa_context_unref(c);
365 }
366
367 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
368     pa_usec_t x;
369
370     pa_assert(s);
371     pa_assert(!force_start || !force_stop);
372
373     if (!s->smoother)
374         return;
375
376     x = pa_rtclock_usec();
377
378     if (s->timing_info_valid) {
379         if (aposteriori)
380             x -= s->timing_info.transport_usec;
381         else
382             x += s->timing_info.transport_usec;
383
384         if (s->direction == PA_STREAM_PLAYBACK)
385             /* it takes a while until the pause/resume is actually
386              * audible */
387             x += s->timing_info.sink_usec;
388         else
389             /* Data froma  while back will be dropped */
390             x -= s->timing_info.source_usec;
391     }
392
393     if (s->suspended || s->corked || force_stop)
394         pa_smoother_pause(s->smoother, x);
395     else if (force_start || s->buffer_attr.prebuf == 0)
396         pa_smoother_resume(s->smoother, x, TRUE);
397
398
399     /* Please note that we have no idea if playback actually started
400      * if prebuf is non-zero! */
401 }
402
403 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
404     pa_context *c = userdata;
405     pa_stream *s;
406     uint32_t channel;
407     const char *dn;
408     pa_bool_t suspended;
409     uint32_t di;
410     pa_usec_t usec = 0;
411     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
412
413     pa_assert(pd);
414     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
415     pa_assert(t);
416     pa_assert(c);
417     pa_assert(PA_REFCNT_VALUE(c) >= 1);
418
419     pa_context_ref(c);
420
421     if (c->version < 12) {
422         pa_context_fail(c, PA_ERR_PROTOCOL);
423         goto finish;
424     }
425
426     if (pa_tagstruct_getu32(t, &channel) < 0 ||
427         pa_tagstruct_getu32(t, &di) < 0 ||
428         pa_tagstruct_gets(t, &dn) < 0 ||
429         pa_tagstruct_get_boolean(t, &suspended) < 0) {
430         pa_context_fail(c, PA_ERR_PROTOCOL);
431         goto finish;
432     }
433
434     if (c->version >= 13) {
435
436         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
437             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
438                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
439                 pa_tagstruct_get_usec(t, &usec) < 0) {
440                 pa_context_fail(c, PA_ERR_PROTOCOL);
441                 goto finish;
442             }
443         } else {
444             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
445                 pa_tagstruct_getu32(t, &tlength) < 0 ||
446                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
447                 pa_tagstruct_getu32(t, &minreq) < 0 ||
448                 pa_tagstruct_get_usec(t, &usec) < 0) {
449                 pa_context_fail(c, PA_ERR_PROTOCOL);
450                 goto finish;
451             }
452         }
453     }
454
455     if (!pa_tagstruct_eof(t)) {
456         pa_context_fail(c, PA_ERR_PROTOCOL);
457         goto finish;
458     }
459
460     if (!dn || di == PA_INVALID_INDEX) {
461         pa_context_fail(c, PA_ERR_PROTOCOL);
462         goto finish;
463     }
464
465     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
466         goto finish;
467
468     if (s->state != PA_STREAM_READY)
469         goto finish;
470
471     if (c->version >= 13) {
472         if (s->direction == PA_STREAM_RECORD)
473             s->timing_info.configured_source_usec = usec;
474         else
475             s->timing_info.configured_sink_usec = usec;
476
477         s->buffer_attr.maxlength = maxlength;
478         s->buffer_attr.fragsize = fragsize;
479         s->buffer_attr.tlength = tlength;
480         s->buffer_attr.prebuf = prebuf;
481         s->buffer_attr.minreq = minreq;
482     }
483
484     pa_xfree(s->device_name);
485     s->device_name = pa_xstrdup(dn);
486     s->device_index = di;
487
488     s->suspended = suspended;
489
490     check_smoother_status(s, TRUE, FALSE, FALSE);
491     request_auto_timing_update(s, TRUE);
492
493     if (s->moved_callback)
494         s->moved_callback(s, s->moved_userdata);
495
496 finish:
497     pa_context_unref(c);
498 }
499
500 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
501     pa_context *c = userdata;
502     pa_stream *s;
503     uint32_t channel;
504     pa_usec_t usec = 0;
505     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
506
507     pa_assert(pd);
508     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
509     pa_assert(t);
510     pa_assert(c);
511     pa_assert(PA_REFCNT_VALUE(c) >= 1);
512
513     pa_context_ref(c);
514
515     if (c->version < 15) {
516         pa_context_fail(c, PA_ERR_PROTOCOL);
517         goto finish;
518     }
519
520     if (pa_tagstruct_getu32(t, &channel) < 0) {
521         pa_context_fail(c, PA_ERR_PROTOCOL);
522         goto finish;
523     }
524
525     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
526         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
527             pa_tagstruct_getu32(t, &fragsize) < 0 ||
528             pa_tagstruct_get_usec(t, &usec) < 0) {
529             pa_context_fail(c, PA_ERR_PROTOCOL);
530             goto finish;
531         }
532     } else {
533         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
534             pa_tagstruct_getu32(t, &tlength) < 0 ||
535             pa_tagstruct_getu32(t, &prebuf) < 0 ||
536             pa_tagstruct_getu32(t, &minreq) < 0 ||
537             pa_tagstruct_get_usec(t, &usec) < 0) {
538             pa_context_fail(c, PA_ERR_PROTOCOL);
539             goto finish;
540         }
541     }
542
543     if (!pa_tagstruct_eof(t)) {
544         pa_context_fail(c, PA_ERR_PROTOCOL);
545         goto finish;
546     }
547
548     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
549         goto finish;
550
551     if (s->state != PA_STREAM_READY)
552         goto finish;
553
554     if (s->direction == PA_STREAM_RECORD)
555         s->timing_info.configured_source_usec = usec;
556     else
557         s->timing_info.configured_sink_usec = usec;
558
559     s->buffer_attr.maxlength = maxlength;
560     s->buffer_attr.fragsize = fragsize;
561     s->buffer_attr.tlength = tlength;
562     s->buffer_attr.prebuf = prebuf;
563     s->buffer_attr.minreq = minreq;
564
565     request_auto_timing_update(s, TRUE);
566
567     if (s->buffer_attr_callback)
568         s->buffer_attr_callback(s, s->buffer_attr_userdata);
569
570 finish:
571     pa_context_unref(c);
572 }
573
574 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
575     pa_context *c = userdata;
576     pa_stream *s;
577     uint32_t channel;
578     pa_bool_t suspended;
579
580     pa_assert(pd);
581     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
582     pa_assert(t);
583     pa_assert(c);
584     pa_assert(PA_REFCNT_VALUE(c) >= 1);
585
586     pa_context_ref(c);
587
588     if (c->version < 12) {
589         pa_context_fail(c, PA_ERR_PROTOCOL);
590         goto finish;
591     }
592
593     if (pa_tagstruct_getu32(t, &channel) < 0 ||
594         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
595         !pa_tagstruct_eof(t)) {
596         pa_context_fail(c, PA_ERR_PROTOCOL);
597         goto finish;
598     }
599
600     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
601         goto finish;
602
603     if (s->state != PA_STREAM_READY)
604         goto finish;
605
606     s->suspended = suspended;
607
608     check_smoother_status(s, TRUE, FALSE, FALSE);
609     request_auto_timing_update(s, TRUE);
610
611     if (s->suspended_callback)
612         s->suspended_callback(s, s->suspended_userdata);
613
614 finish:
615     pa_context_unref(c);
616 }
617
618 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
619     pa_context *c = userdata;
620     pa_stream *s;
621     uint32_t channel;
622
623     pa_assert(pd);
624     pa_assert(command == PA_COMMAND_STARTED);
625     pa_assert(t);
626     pa_assert(c);
627     pa_assert(PA_REFCNT_VALUE(c) >= 1);
628
629     pa_context_ref(c);
630
631     if (c->version < 13) {
632         pa_context_fail(c, PA_ERR_PROTOCOL);
633         goto finish;
634     }
635
636     if (pa_tagstruct_getu32(t, &channel) < 0 ||
637         !pa_tagstruct_eof(t)) {
638         pa_context_fail(c, PA_ERR_PROTOCOL);
639         goto finish;
640     }
641
642     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
643         goto finish;
644
645     if (s->state != PA_STREAM_READY)
646         goto finish;
647
648     check_smoother_status(s, TRUE, TRUE, FALSE);
649     request_auto_timing_update(s, TRUE);
650
651     if (s->started_callback)
652         s->started_callback(s, s->started_userdata);
653
654 finish:
655     pa_context_unref(c);
656 }
657
658 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
659     pa_context *c = userdata;
660     pa_stream *s;
661     uint32_t channel;
662     pa_proplist *pl = NULL;
663     const char *event;
664
665     pa_assert(pd);
666     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
667     pa_assert(t);
668     pa_assert(c);
669     pa_assert(PA_REFCNT_VALUE(c) >= 1);
670
671     pa_context_ref(c);
672
673     if (c->version < 15) {
674         pa_context_fail(c, PA_ERR_PROTOCOL);
675         goto finish;
676     }
677
678     pl = pa_proplist_new();
679
680     if (pa_tagstruct_getu32(t, &channel) < 0 ||
681         pa_tagstruct_gets(t, &event) < 0 ||
682         pa_tagstruct_get_proplist(t, pl) < 0 ||
683         !pa_tagstruct_eof(t) || !event) {
684         pa_context_fail(c, PA_ERR_PROTOCOL);
685         goto finish;
686     }
687
688     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
689         goto finish;
690
691     if (s->state != PA_STREAM_READY)
692         goto finish;
693
694     if (s->event_callback)
695         s->event_callback(s, event, pl, s->event_userdata);
696
697 finish:
698     pa_context_unref(c);
699
700     if (pl)
701         pa_proplist_free(pl);
702 }
703
704 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
705     pa_stream *s;
706     pa_context *c = userdata;
707     uint32_t bytes, channel;
708
709     pa_assert(pd);
710     pa_assert(command == PA_COMMAND_REQUEST);
711     pa_assert(t);
712     pa_assert(c);
713     pa_assert(PA_REFCNT_VALUE(c) >= 1);
714
715     pa_context_ref(c);
716
717     if (pa_tagstruct_getu32(t, &channel) < 0 ||
718         pa_tagstruct_getu32(t, &bytes) < 0 ||
719         !pa_tagstruct_eof(t)) {
720         pa_context_fail(c, PA_ERR_PROTOCOL);
721         goto finish;
722     }
723
724     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
725         goto finish;
726
727     if (s->state != PA_STREAM_READY)
728         goto finish;
729
730     s->requested_bytes += bytes;
731
732     if (s->requested_bytes > 0 && s->write_callback)
733         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
734
735 finish:
736     pa_context_unref(c);
737 }
738
739 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
740     pa_stream *s;
741     pa_context *c = userdata;
742     uint32_t channel;
743
744     pa_assert(pd);
745     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
746     pa_assert(t);
747     pa_assert(c);
748     pa_assert(PA_REFCNT_VALUE(c) >= 1);
749
750     pa_context_ref(c);
751
752     if (pa_tagstruct_getu32(t, &channel) < 0 ||
753         !pa_tagstruct_eof(t)) {
754         pa_context_fail(c, PA_ERR_PROTOCOL);
755         goto finish;
756     }
757
758     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
759         goto finish;
760
761     if (s->state != PA_STREAM_READY)
762         goto finish;
763
764     if (s->buffer_attr.prebuf > 0)
765         check_smoother_status(s, TRUE, FALSE, TRUE);
766
767     request_auto_timing_update(s, TRUE);
768
769     if (command == PA_COMMAND_OVERFLOW) {
770         if (s->overflow_callback)
771             s->overflow_callback(s, s->overflow_userdata);
772     } else if (command == PA_COMMAND_UNDERFLOW) {
773         if (s->underflow_callback)
774             s->underflow_callback(s, s->underflow_userdata);
775     }
776
777  finish:
778     pa_context_unref(c);
779 }
780
781 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
782     pa_assert(s);
783     pa_assert(PA_REFCNT_VALUE(s) >= 1);
784
785 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
786
787     if (s->state != PA_STREAM_READY)
788         return;
789
790     if (w) {
791         s->write_index_not_before = s->context->ctag;
792
793         if (s->timing_info_valid)
794             s->timing_info.write_index_corrupt = TRUE;
795
796 /*         pa_log("write_index invalidated"); */
797     }
798
799     if (r) {
800         s->read_index_not_before = s->context->ctag;
801
802         if (s->timing_info_valid)
803             s->timing_info.read_index_corrupt = TRUE;
804
805 /*         pa_log("read_index invalidated"); */
806     }
807
808     request_auto_timing_update(s, TRUE);
809 }
810
811 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
812     pa_stream *s = userdata;
813
814     pa_assert(s);
815     pa_assert(PA_REFCNT_VALUE(s) >= 1);
816
817     pa_stream_ref(s);
818     request_auto_timing_update(s, FALSE);
819     pa_stream_unref(s);
820 }
821
822 static void create_stream_complete(pa_stream *s) {
823     pa_assert(s);
824     pa_assert(PA_REFCNT_VALUE(s) >= 1);
825     pa_assert(s->state == PA_STREAM_CREATING);
826
827     pa_stream_set_state(s, PA_STREAM_READY);
828
829     if (s->requested_bytes > 0 && s->write_callback)
830         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
831
832     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
833         struct timeval tv;
834         pa_gettimeofday(&tv);
835         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
836         pa_timeval_add(&tv, s->auto_timing_interval_usec);
837         pa_assert(!s->auto_timing_update_event);
838         s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
839
840         request_auto_timing_update(s, TRUE);
841     }
842
843     check_smoother_status(s, TRUE, FALSE, FALSE);
844 }
845
846 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
847     pa_assert(s);
848     pa_assert(attr);
849     pa_assert(ss);
850
851     if (s->context->version >= 13)
852         return;
853
854     /* Version older than 0.9.10 didn't do server side buffer_attr
855      * selection, hence we have to fake it on the client side. */
856
857     /* We choose fairly conservative values here, to not confuse
858      * old clients with extremely large playback buffers */
859
860     if (attr->maxlength == (uint32_t) -1)
861         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
862
863     if (attr->tlength == (uint32_t) -1)
864         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
865
866     if (attr->minreq == (uint32_t) -1)
867         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
868
869     if (attr->prebuf == (uint32_t) -1)
870         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
871
872     if (attr->fragsize == (uint32_t) -1)
873         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
874 }
875
876 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
877     pa_stream *s = userdata;
878     uint32_t requested_bytes;
879
880     pa_assert(pd);
881     pa_assert(s);
882     pa_assert(PA_REFCNT_VALUE(s) >= 1);
883     pa_assert(s->state == PA_STREAM_CREATING);
884
885     pa_stream_ref(s);
886
887     if (command != PA_COMMAND_REPLY) {
888         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
889             goto finish;
890
891         pa_stream_set_state(s, PA_STREAM_FAILED);
892         goto finish;
893     }
894
895     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
896         s->channel == PA_INVALID_INDEX ||
897         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
898         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
899         pa_context_fail(s->context, PA_ERR_PROTOCOL);
900         goto finish;
901     }
902
903     s->requested_bytes = (int64_t) requested_bytes;
904
905     if (s->context->version >= 9) {
906         if (s->direction == PA_STREAM_PLAYBACK) {
907             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
909                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
910                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
911                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
912                 goto finish;
913             }
914         } else if (s->direction == PA_STREAM_RECORD) {
915             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
916                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
917                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
918                 goto finish;
919             }
920         }
921     }
922
923     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
924         pa_sample_spec ss;
925         pa_channel_map cm;
926         const char *dn = NULL;
927         pa_bool_t suspended;
928
929         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
930             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
931             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
932             pa_tagstruct_gets(t, &dn) < 0 ||
933             pa_tagstruct_get_boolean(t, &suspended) < 0) {
934             pa_context_fail(s->context, PA_ERR_PROTOCOL);
935             goto finish;
936         }
937
938         if (!dn || s->device_index == PA_INVALID_INDEX ||
939             ss.channels != cm.channels ||
940             !pa_channel_map_valid(&cm) ||
941             !pa_sample_spec_valid(&ss) ||
942             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
943             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
944             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
945             pa_context_fail(s->context, PA_ERR_PROTOCOL);
946             goto finish;
947         }
948
949         pa_xfree(s->device_name);
950         s->device_name = pa_xstrdup(dn);
951         s->suspended = suspended;
952
953         s->channel_map = cm;
954         s->sample_spec = ss;
955     }
956
957     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
958         pa_usec_t usec;
959
960         if (pa_tagstruct_get_usec(t, &usec) < 0) {
961             pa_context_fail(s->context, PA_ERR_PROTOCOL);
962             goto finish;
963         }
964
965         if (s->direction == PA_STREAM_RECORD)
966             s->timing_info.configured_source_usec = usec;
967         else
968             s->timing_info.configured_sink_usec = usec;
969     }
970
971     if (!pa_tagstruct_eof(t)) {
972         pa_context_fail(s->context, PA_ERR_PROTOCOL);
973         goto finish;
974     }
975
976     if (s->direction == PA_STREAM_RECORD) {
977         pa_assert(!s->record_memblockq);
978
979         s->record_memblockq = pa_memblockq_new(
980                 0,
981                 s->buffer_attr.maxlength,
982                 0,
983                 pa_frame_size(&s->sample_spec),
984                 1,
985                 0,
986                 0,
987                 NULL);
988     }
989
990     s->channel_valid = TRUE;
991     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
992
993     create_stream_complete(s);
994
995 finish:
996     pa_stream_unref(s);
997 }
998
999 static int create_stream(
1000         pa_stream_direction_t direction,
1001         pa_stream *s,
1002         const char *dev,
1003         const pa_buffer_attr *attr,
1004         pa_stream_flags_t flags,
1005         const pa_cvolume *volume,
1006         pa_stream *sync_stream) {
1007
1008     pa_tagstruct *t;
1009     uint32_t tag;
1010     pa_bool_t volume_set = FALSE;
1011
1012     pa_assert(s);
1013     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1014     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1015
1016     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1017     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1018     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1019     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1020                                               PA_STREAM_INTERPOLATE_TIMING|
1021                                               PA_STREAM_NOT_MONOTONIC|
1022                                               PA_STREAM_AUTO_TIMING_UPDATE|
1023                                               PA_STREAM_NO_REMAP_CHANNELS|
1024                                               PA_STREAM_NO_REMIX_CHANNELS|
1025                                               PA_STREAM_FIX_FORMAT|
1026                                               PA_STREAM_FIX_RATE|
1027                                               PA_STREAM_FIX_CHANNELS|
1028                                               PA_STREAM_DONT_MOVE|
1029                                               PA_STREAM_VARIABLE_RATE|
1030                                               PA_STREAM_PEAK_DETECT|
1031                                               PA_STREAM_START_MUTED|
1032                                               PA_STREAM_ADJUST_LATENCY|
1033                                               PA_STREAM_EARLY_REQUESTS|
1034                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1035                                               PA_STREAM_START_UNMUTED|
1036                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1037
1038     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1039     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1040     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1041     /* Althought some of the other flags are not supported on older
1042      * version, we don't check for them here, because it doesn't hurt
1043      * when they are passed but actually not supported. This makes
1044      * client development easier */
1045
1046     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1047     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1048     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1049     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1050     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1051
1052     pa_stream_ref(s);
1053
1054     s->direction = direction;
1055     s->flags = flags;
1056     s->corked = !!(flags & PA_STREAM_START_CORKED);
1057
1058     if (sync_stream)
1059         s->syncid = sync_stream->syncid;
1060
1061     if (attr)
1062         s->buffer_attr = *attr;
1063     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1064
1065     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1066         pa_usec_t x;
1067
1068         x = pa_rtclock_usec();
1069
1070         pa_assert(!s->smoother);
1071         s->smoother = pa_smoother_new(
1072                 SMOOTHER_ADJUST_TIME,
1073                 SMOOTHER_HISTORY_TIME,
1074                 !(flags & PA_STREAM_NOT_MONOTONIC),
1075                 TRUE,
1076                 SMOOTHER_MIN_HISTORY,
1077                 x,
1078                 TRUE);
1079     }
1080
1081     if (!dev)
1082         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1083
1084     t = pa_tagstruct_command(
1085             s->context,
1086             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1087             &tag);
1088
1089     if (s->context->version < 13)
1090         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1091
1092     pa_tagstruct_put(
1093             t,
1094             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1095             PA_TAG_CHANNEL_MAP, &s->channel_map,
1096             PA_TAG_U32, PA_INVALID_INDEX,
1097             PA_TAG_STRING, dev,
1098             PA_TAG_U32, s->buffer_attr.maxlength,
1099             PA_TAG_BOOLEAN, s->corked,
1100             PA_TAG_INVALID);
1101
1102     if (s->direction == PA_STREAM_PLAYBACK) {
1103         pa_cvolume cv;
1104
1105         pa_tagstruct_put(
1106                 t,
1107                 PA_TAG_U32, s->buffer_attr.tlength,
1108                 PA_TAG_U32, s->buffer_attr.prebuf,
1109                 PA_TAG_U32, s->buffer_attr.minreq,
1110                 PA_TAG_U32, s->syncid,
1111                 PA_TAG_INVALID);
1112
1113         volume_set = !!volume;
1114
1115         if (!volume)
1116             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1117
1118         pa_tagstruct_put_cvolume(t, volume);
1119     } else
1120         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1121
1122     if (s->context->version >= 12) {
1123         pa_tagstruct_put(
1124                 t,
1125                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1126                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1127                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1128                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1129                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1130                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1131                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1132                 PA_TAG_INVALID);
1133     }
1134
1135     if (s->context->version >= 13) {
1136
1137         if (s->direction == PA_STREAM_PLAYBACK)
1138             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1139         else
1140             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1141
1142         pa_tagstruct_put(
1143                 t,
1144                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1145                 PA_TAG_PROPLIST, s->proplist,
1146                 PA_TAG_INVALID);
1147
1148         if (s->direction == PA_STREAM_RECORD)
1149             pa_tagstruct_putu32(t, s->direct_on_input);
1150     }
1151
1152     if (s->context->version >= 14) {
1153
1154         if (s->direction == PA_STREAM_PLAYBACK)
1155             pa_tagstruct_put_boolean(t, volume_set);
1156
1157         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1158     }
1159
1160     if (s->context->version >= 15) {
1161
1162         if (s->direction == PA_STREAM_PLAYBACK)
1163             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1164
1165         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1166         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1167     }
1168
1169     pa_pstream_send_tagstruct(s->context->pstream, t);
1170     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1171
1172     pa_stream_set_state(s, PA_STREAM_CREATING);
1173
1174     pa_stream_unref(s);
1175     return 0;
1176 }
1177
1178 int pa_stream_connect_playback(
1179         pa_stream *s,
1180         const char *dev,
1181         const pa_buffer_attr *attr,
1182         pa_stream_flags_t flags,
1183         pa_cvolume *volume,
1184         pa_stream *sync_stream) {
1185
1186     pa_assert(s);
1187     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1188
1189     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1190 }
1191
1192 int pa_stream_connect_record(
1193         pa_stream *s,
1194         const char *dev,
1195         const pa_buffer_attr *attr,
1196         pa_stream_flags_t flags) {
1197
1198     pa_assert(s);
1199     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1200
1201     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1202 }
1203
1204 int pa_stream_write(
1205         pa_stream *s,
1206         const void *data,
1207         size_t length,
1208         void (*free_cb)(void *p),
1209         int64_t offset,
1210         pa_seek_mode_t seek) {
1211
1212     pa_memchunk chunk;
1213     pa_seek_mode_t t_seek;
1214     int64_t t_offset;
1215     size_t t_length;
1216     const void *t_data;
1217
1218     pa_assert(s);
1219     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1220     pa_assert(data);
1221
1222     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1223     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1224     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1225     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1226     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1227
1228     if (length <= 0)
1229         return 0;
1230
1231     t_seek = seek;
1232     t_offset = offset;
1233     t_length = length;
1234     t_data = data;
1235
1236     while (t_length > 0) {
1237
1238         chunk.index = 0;
1239
1240         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1241             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1242             chunk.length = t_length;
1243         } else {
1244             void *d;
1245
1246             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1247             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1248
1249             d = pa_memblock_acquire(chunk.memblock);
1250             memcpy(d, t_data, chunk.length);
1251             pa_memblock_release(chunk.memblock);
1252         }
1253
1254         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1255
1256         t_offset = 0;
1257         t_seek = PA_SEEK_RELATIVE;
1258
1259         t_data = (const uint8_t*) t_data + chunk.length;
1260         t_length -= chunk.length;
1261
1262         pa_memblock_unref(chunk.memblock);
1263     }
1264
1265     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1266         free_cb((void*) data);
1267
1268     /* This is obviously wrong since we ignore the seeking index . But
1269      * that's OK, the server side applies the same error */
1270     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1271
1272     if (s->direction == PA_STREAM_PLAYBACK) {
1273
1274         /* Update latency request correction */
1275         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1276
1277             if (seek == PA_SEEK_ABSOLUTE) {
1278                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1279                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1280                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1281             } else if (seek == PA_SEEK_RELATIVE) {
1282                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1283                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1284             } else
1285                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1286         }
1287
1288         /* Update the write index in the already available latency data */
1289         if (s->timing_info_valid) {
1290
1291             if (seek == PA_SEEK_ABSOLUTE) {
1292                 s->timing_info.write_index_corrupt = FALSE;
1293                 s->timing_info.write_index = offset + (int64_t) length;
1294             } else if (seek == PA_SEEK_RELATIVE) {
1295                 if (!s->timing_info.write_index_corrupt)
1296                     s->timing_info.write_index += offset + (int64_t) length;
1297             } else
1298                 s->timing_info.write_index_corrupt = TRUE;
1299         }
1300
1301         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1302             request_auto_timing_update(s, TRUE);
1303     }
1304
1305     return 0;
1306 }
1307
1308 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1309     pa_assert(s);
1310     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1311     pa_assert(data);
1312     pa_assert(length);
1313
1314     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1315     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1316     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1317
1318     if (!s->peek_memchunk.memblock) {
1319
1320         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1321             *data = NULL;
1322             *length = 0;
1323             return 0;
1324         }
1325
1326         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1327     }
1328
1329     pa_assert(s->peek_data);
1330     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1331     *length = s->peek_memchunk.length;
1332     return 0;
1333 }
1334
1335 int pa_stream_drop(pa_stream *s) {
1336     pa_assert(s);
1337     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1338
1339     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1340     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1341     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1342     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1343
1344     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1345
1346     /* Fix the simulated local read index */
1347     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1348         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1349
1350     pa_assert(s->peek_data);
1351     pa_memblock_release(s->peek_memchunk.memblock);
1352     pa_memblock_unref(s->peek_memchunk.memblock);
1353     pa_memchunk_reset(&s->peek_memchunk);
1354
1355     return 0;
1356 }
1357
1358 size_t pa_stream_writable_size(pa_stream *s) {
1359     pa_assert(s);
1360     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1361
1362     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1363     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1364     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1365
1366     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1367 }
1368
1369 size_t pa_stream_readable_size(pa_stream *s) {
1370     pa_assert(s);
1371     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1372
1373     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1374     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1375     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1376
1377     return pa_memblockq_get_length(s->record_memblockq);
1378 }
1379
1380 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1381     pa_operation *o;
1382     pa_tagstruct *t;
1383     uint32_t tag;
1384
1385     pa_assert(s);
1386     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1387
1388     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1389     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1390     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1391
1392     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1393
1394     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1395     pa_tagstruct_putu32(t, s->channel);
1396     pa_pstream_send_tagstruct(s->context->pstream, t);
1397     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1398
1399     return o;
1400 }
1401
1402 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1403     pa_usec_t usec;
1404
1405     pa_assert(s);
1406     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1407     pa_assert(s->state == PA_STREAM_READY);
1408     pa_assert(s->direction != PA_STREAM_UPLOAD);
1409     pa_assert(s->timing_info_valid);
1410     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1411     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1412
1413     if (s->direction == PA_STREAM_PLAYBACK) {
1414         /* The last byte that was written into the output device
1415          * had this time value associated */
1416         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1417
1418         if (!s->corked && !s->suspended) {
1419
1420             if (!ignore_transport)
1421                 /* Because the latency info took a little time to come
1422                  * to us, we assume that the real output time is actually
1423                  * a little ahead */
1424                 usec += s->timing_info.transport_usec;
1425
1426             /* However, the output device usually maintains a buffer
1427                too, hence the real sample currently played is a little
1428                back  */
1429             if (s->timing_info.sink_usec >= usec)
1430                 usec = 0;
1431             else
1432                 usec -= s->timing_info.sink_usec;
1433         }
1434
1435     } else {
1436         pa_assert(s->direction == PA_STREAM_RECORD);
1437
1438         /* The last byte written into the server side queue had
1439          * this time value associated */
1440         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1441
1442         if (!s->corked && !s->suspended) {
1443
1444             if (!ignore_transport)
1445                 /* Add transport latency */
1446                 usec += s->timing_info.transport_usec;
1447
1448             /* Add latency of data in device buffer */
1449             usec += s->timing_info.source_usec;
1450
1451             /* If this is a monitor source, we need to correct the
1452              * time by the playback device buffer */
1453             if (s->timing_info.sink_usec >= usec)
1454                 usec = 0;
1455             else
1456                 usec -= s->timing_info.sink_usec;
1457         }
1458     }
1459
1460     return usec;
1461 }
1462
1463 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1464     pa_operation *o = userdata;
1465     struct timeval local, remote, now;
1466     pa_timing_info *i;
1467     pa_bool_t playing = FALSE;
1468     uint64_t underrun_for = 0, playing_for = 0;
1469
1470     pa_assert(pd);
1471     pa_assert(o);
1472     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1473
1474     if (!o->context || !o->stream)
1475         goto finish;
1476
1477     i = &o->stream->timing_info;
1478
1479     o->stream->timing_info_valid = FALSE;
1480     i->write_index_corrupt = TRUE;
1481     i->read_index_corrupt = TRUE;
1482
1483     if (command != PA_COMMAND_REPLY) {
1484         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1485             goto finish;
1486
1487     } else {
1488
1489         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1490             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1491             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1492             pa_tagstruct_get_timeval(t, &local) < 0 ||
1493             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1494             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1495             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1496
1497             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1498             goto finish;
1499         }
1500
1501         if (o->context->version >= 13 &&
1502             o->stream->direction == PA_STREAM_PLAYBACK)
1503             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1504                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1505
1506                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1507                 goto finish;
1508             }
1509
1510
1511         if (!pa_tagstruct_eof(t)) {
1512             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1513             goto finish;
1514         }
1515         o->stream->timing_info_valid = TRUE;
1516         i->write_index_corrupt = FALSE;
1517         i->read_index_corrupt = FALSE;
1518
1519         i->playing = (int) playing;
1520         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1521
1522         pa_gettimeofday(&now);
1523
1524         /* Calculcate timestamps */
1525         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1526             /* local and remote seem to have synchronized clocks */
1527
1528             if (o->stream->direction == PA_STREAM_PLAYBACK)
1529                 i->transport_usec = pa_timeval_diff(&remote, &local);
1530             else
1531                 i->transport_usec = pa_timeval_diff(&now, &remote);
1532
1533             i->synchronized_clocks = TRUE;
1534             i->timestamp = remote;
1535         } else {
1536             /* clocks are not synchronized, let's estimate latency then */
1537             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1538             i->synchronized_clocks = FALSE;
1539             i->timestamp = local;
1540             pa_timeval_add(&i->timestamp, i->transport_usec);
1541         }
1542
1543         /* Invalidate read and write indexes if necessary */
1544         if (tag < o->stream->read_index_not_before)
1545             i->read_index_corrupt = TRUE;
1546
1547         if (tag < o->stream->write_index_not_before)
1548             i->write_index_corrupt = TRUE;
1549
1550         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1551             /* Write index correction */
1552
1553             int n, j;
1554             uint32_t ctag = tag;
1555
1556             /* Go through the saved correction values and add up the
1557              * total correction.*/
1558             for (n = 0, j = o->stream->current_write_index_correction+1;
1559                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1560                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1561
1562                 /* Step over invalid data or out-of-date data */
1563                 if (!o->stream->write_index_corrections[j].valid ||
1564                     o->stream->write_index_corrections[j].tag < ctag)
1565                     continue;
1566
1567                 /* Make sure that everything is in order */
1568                 ctag = o->stream->write_index_corrections[j].tag+1;
1569
1570                 /* Now fix the write index */
1571                 if (o->stream->write_index_corrections[j].corrupt) {
1572                     /* A corrupting seek was made */
1573                     i->write_index_corrupt = TRUE;
1574                 } else if (o->stream->write_index_corrections[j].absolute) {
1575                     /* An absolute seek was made */
1576                     i->write_index = o->stream->write_index_corrections[j].value;
1577                     i->write_index_corrupt = FALSE;
1578                 } else if (!i->write_index_corrupt) {
1579                     /* A relative seek was made */
1580                     i->write_index += o->stream->write_index_corrections[j].value;
1581                 }
1582             }
1583
1584             /* Clear old correction entries */
1585             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1586                 if (!o->stream->write_index_corrections[n].valid)
1587                     continue;
1588
1589                 if (o->stream->write_index_corrections[n].tag <= tag)
1590                     o->stream->write_index_corrections[n].valid = FALSE;
1591             }
1592         }
1593
1594         if (o->stream->direction == PA_STREAM_RECORD) {
1595             /* Read index correction */
1596
1597             if (!i->read_index_corrupt)
1598                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1599         }
1600
1601         /* Update smoother */
1602         if (o->stream->smoother) {
1603             pa_usec_t u, x;
1604
1605             u = x = pa_rtclock_usec() - i->transport_usec;
1606
1607             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1608                 pa_usec_t su;
1609
1610                 /* If we weren't playing then it will take some time
1611                  * until the audio will actually come out through the
1612                  * speakers. Since we follow that timing here, we need
1613                  * to try to fix this up */
1614
1615                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1616
1617                 if (su < i->sink_usec)
1618                     x += i->sink_usec - su;
1619             }
1620
1621             if (!i->playing)
1622                 pa_smoother_pause(o->stream->smoother, x);
1623
1624             /* Update the smoother */
1625             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1626                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1627                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1628
1629             if (i->playing)
1630                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1631         }
1632     }
1633
1634     o->stream->auto_timing_update_requested = FALSE;
1635
1636     if (o->stream->latency_update_callback)
1637         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1638
1639     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1640         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1641         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1642     }
1643
1644 finish:
1645
1646     pa_operation_done(o);
1647     pa_operation_unref(o);
1648 }
1649
1650 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1651     uint32_t tag;
1652     pa_operation *o;
1653     pa_tagstruct *t;
1654     struct timeval now;
1655     int cidx = 0;
1656
1657     pa_assert(s);
1658     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1659
1660     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1661     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1662     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1663
1664     if (s->direction == PA_STREAM_PLAYBACK) {
1665         /* Find a place to store the write_index correction data for this entry */
1666         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1667
1668         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1669         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1670     }
1671     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1672
1673     t = pa_tagstruct_command(
1674             s->context,
1675             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1676             &tag);
1677     pa_tagstruct_putu32(t, s->channel);
1678     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1679
1680     pa_pstream_send_tagstruct(s->context->pstream, t);
1681     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1682
1683     if (s->direction == PA_STREAM_PLAYBACK) {
1684         /* Fill in initial correction data */
1685
1686         s->current_write_index_correction = cidx;
1687
1688         s->write_index_corrections[cidx].valid = TRUE;
1689         s->write_index_corrections[cidx].absolute = FALSE;
1690         s->write_index_corrections[cidx].corrupt = FALSE;
1691         s->write_index_corrections[cidx].tag = tag;
1692         s->write_index_corrections[cidx].value = 0;
1693     }
1694
1695     return o;
1696 }
1697
1698 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1699     pa_stream *s = userdata;
1700
1701     pa_assert(pd);
1702     pa_assert(s);
1703     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1704
1705     pa_stream_ref(s);
1706
1707     if (command != PA_COMMAND_REPLY) {
1708         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1709             goto finish;
1710
1711         pa_stream_set_state(s, PA_STREAM_FAILED);
1712         goto finish;
1713     } else if (!pa_tagstruct_eof(t)) {
1714         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1715         goto finish;
1716     }
1717
1718     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1719
1720 finish:
1721     pa_stream_unref(s);
1722 }
1723
1724 int pa_stream_disconnect(pa_stream *s) {
1725     pa_tagstruct *t;
1726     uint32_t tag;
1727
1728     pa_assert(s);
1729     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1730
1731     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1732     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1733     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1734
1735     pa_stream_ref(s);
1736
1737     t = pa_tagstruct_command(
1738             s->context,
1739             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1740                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1741             &tag);
1742     pa_tagstruct_putu32(t, s->channel);
1743     pa_pstream_send_tagstruct(s->context->pstream, t);
1744     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1745
1746     pa_stream_unref(s);
1747     return 0;
1748 }
1749
1750 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1751     pa_assert(s);
1752     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1753
1754     if (pa_detect_fork())
1755         return;
1756
1757     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1758         return;
1759
1760     s->read_callback = cb;
1761     s->read_userdata = userdata;
1762 }
1763
1764 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1765     pa_assert(s);
1766     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1767
1768     if (pa_detect_fork())
1769         return;
1770
1771     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1772         return;
1773
1774     s->write_callback = cb;
1775     s->write_userdata = userdata;
1776 }
1777
1778 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1779     pa_assert(s);
1780     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1781
1782     if (pa_detect_fork())
1783         return;
1784
1785     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1786         return;
1787
1788     s->state_callback = cb;
1789     s->state_userdata = userdata;
1790 }
1791
1792 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1793     pa_assert(s);
1794     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1795
1796     if (pa_detect_fork())
1797         return;
1798
1799     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1800         return;
1801
1802     s->overflow_callback = cb;
1803     s->overflow_userdata = userdata;
1804 }
1805
1806 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1807     pa_assert(s);
1808     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1809
1810     if (pa_detect_fork())
1811         return;
1812
1813     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1814         return;
1815
1816     s->underflow_callback = cb;
1817     s->underflow_userdata = userdata;
1818 }
1819
1820 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1821     pa_assert(s);
1822     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1823
1824     if (pa_detect_fork())
1825         return;
1826
1827     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1828         return;
1829
1830     s->latency_update_callback = cb;
1831     s->latency_update_userdata = userdata;
1832 }
1833
1834 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1835     pa_assert(s);
1836     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1837
1838     if (pa_detect_fork())
1839         return;
1840
1841     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1842         return;
1843
1844     s->moved_callback = cb;
1845     s->moved_userdata = userdata;
1846 }
1847
1848 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1849     pa_assert(s);
1850     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1851
1852     if (pa_detect_fork())
1853         return;
1854
1855     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1856         return;
1857
1858     s->suspended_callback = cb;
1859     s->suspended_userdata = userdata;
1860 }
1861
1862 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1863     pa_assert(s);
1864     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1865
1866     if (pa_detect_fork())
1867         return;
1868
1869     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1870         return;
1871
1872     s->started_callback = cb;
1873     s->started_userdata = userdata;
1874 }
1875
1876 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1877     pa_assert(s);
1878     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1879
1880     if (pa_detect_fork())
1881         return;
1882
1883     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1884         return;
1885
1886     s->event_callback = cb;
1887     s->event_userdata = userdata;
1888 }
1889
1890 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1891     pa_assert(s);
1892     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1893
1894     if (pa_detect_fork())
1895         return;
1896
1897     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1898         return;
1899
1900     s->buffer_attr_callback = cb;
1901     s->buffer_attr_userdata = userdata;
1902 }
1903
1904 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1905     pa_operation *o = userdata;
1906     int success = 1;
1907
1908     pa_assert(pd);
1909     pa_assert(o);
1910     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1911
1912     if (!o->context)
1913         goto finish;
1914
1915     if (command != PA_COMMAND_REPLY) {
1916         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1917             goto finish;
1918
1919         success = 0;
1920     } else if (!pa_tagstruct_eof(t)) {
1921         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1922         goto finish;
1923     }
1924
1925     if (o->callback) {
1926         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1927         cb(o->stream, success, o->userdata);
1928     }
1929
1930 finish:
1931     pa_operation_done(o);
1932     pa_operation_unref(o);
1933 }
1934
1935 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1936     pa_operation *o;
1937     pa_tagstruct *t;
1938     uint32_t tag;
1939
1940     pa_assert(s);
1941     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1942
1943     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1944     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1945     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1946
1947     s->corked = b;
1948
1949     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1950
1951     t = pa_tagstruct_command(
1952             s->context,
1953             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1954             &tag);
1955     pa_tagstruct_putu32(t, s->channel);
1956     pa_tagstruct_put_boolean(t, !!b);
1957     pa_pstream_send_tagstruct(s->context->pstream, t);
1958     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1959
1960     check_smoother_status(s, FALSE, FALSE, FALSE);
1961
1962     /* This might cause the indexes to hang/start again, hence
1963      * let's request a timing update */
1964     request_auto_timing_update(s, TRUE);
1965
1966     return o;
1967 }
1968
1969 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1970     pa_tagstruct *t;
1971     pa_operation *o;
1972     uint32_t tag;
1973
1974     pa_assert(s);
1975     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1976
1977     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1978     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1979
1980     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1981
1982     t = pa_tagstruct_command(s->context, command, &tag);
1983     pa_tagstruct_putu32(t, s->channel);
1984     pa_pstream_send_tagstruct(s->context->pstream, t);
1985     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1986
1987     return o;
1988 }
1989
1990 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1991     pa_operation *o;
1992
1993     pa_assert(s);
1994     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1995
1996     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1997     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1998     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1999
2000     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2001         return NULL;
2002
2003     if (s->direction == PA_STREAM_PLAYBACK) {
2004
2005         if (s->write_index_corrections[s->current_write_index_correction].valid)
2006             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2007
2008         if (s->buffer_attr.prebuf > 0)
2009             check_smoother_status(s, FALSE, FALSE, TRUE);
2010
2011         /* This will change the write index, but leave the
2012          * read index untouched. */
2013         invalidate_indexes(s, FALSE, TRUE);
2014
2015     } else
2016         /* For record streams this has no influence on the write
2017          * index, but the read index might jump. */
2018         invalidate_indexes(s, TRUE, FALSE);
2019
2020     return o;
2021 }
2022
2023 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2024     pa_operation *o;
2025
2026     pa_assert(s);
2027     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2028
2029     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2030     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2031     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2032     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2033
2034     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2035         return NULL;
2036
2037     /* This might cause the read index to hang again, hence
2038      * let's request a timing update */
2039     request_auto_timing_update(s, TRUE);
2040
2041     return o;
2042 }
2043
2044 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2045     pa_operation *o;
2046
2047     pa_assert(s);
2048     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2049
2050     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2051     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2052     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2053     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2054
2055     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2056         return NULL;
2057
2058     /* This might cause the read index to start moving again, hence
2059      * let's request a timing update */
2060     request_auto_timing_update(s, TRUE);
2061
2062     return o;
2063 }
2064
2065 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2066     pa_operation *o;
2067
2068     pa_assert(s);
2069     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2070     pa_assert(name);
2071
2072     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2073     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2074     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2075
2076     if (s->context->version >= 13) {
2077         pa_proplist *p = pa_proplist_new();
2078
2079         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2080         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2081         pa_proplist_free(p);
2082     } else {
2083         pa_tagstruct *t;
2084         uint32_t tag;
2085
2086         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2087         t = pa_tagstruct_command(
2088                 s->context,
2089                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2090                 &tag);
2091         pa_tagstruct_putu32(t, s->channel);
2092         pa_tagstruct_puts(t, name);
2093         pa_pstream_send_tagstruct(s->context->pstream, t);
2094         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2095     }
2096
2097     return o;
2098 }
2099
2100 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2101     pa_usec_t usec;
2102
2103     pa_assert(s);
2104     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2105
2106     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2107     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2108     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2109     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2110     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2111     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2112
2113     if (s->smoother)
2114         usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2115     else
2116         usec = calc_time(s, FALSE);
2117
2118     /* Make sure the time runs monotonically */
2119     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2120         if (usec < s->previous_time)
2121             usec = s->previous_time;
2122         else
2123             s->previous_time = usec;
2124     }
2125
2126     if (r_usec)
2127         *r_usec = usec;
2128
2129     return 0;
2130 }
2131
2132 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2133     pa_assert(s);
2134     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2135
2136     if (negative)
2137         *negative = 0;
2138
2139     if (a >= b)
2140         return a-b;
2141     else {
2142         if (negative && s->direction == PA_STREAM_RECORD) {
2143             *negative = 1;
2144             return b-a;
2145         } else
2146             return 0;
2147     }
2148 }
2149
2150 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2151     pa_usec_t t, c;
2152     int r;
2153     int64_t cindex;
2154
2155     pa_assert(s);
2156     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2157     pa_assert(r_usec);
2158
2159     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2160     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2161     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2162     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2163     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2164     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2165
2166     if ((r = pa_stream_get_time(s, &t)) < 0)
2167         return r;
2168
2169     if (s->direction == PA_STREAM_PLAYBACK)
2170         cindex = s->timing_info.write_index;
2171     else
2172         cindex = s->timing_info.read_index;
2173
2174     if (cindex < 0)
2175         cindex = 0;
2176
2177     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2178
2179     if (s->direction == PA_STREAM_PLAYBACK)
2180         *r_usec = time_counter_diff(s, c, t, negative);
2181     else
2182         *r_usec = time_counter_diff(s, t, c, negative);
2183
2184     return 0;
2185 }
2186
2187 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2188     pa_assert(s);
2189     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2190
2191     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2192     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2193     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2194     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2195
2196     return &s->timing_info;
2197 }
2198
2199 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2200     pa_assert(s);
2201     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2202
2203     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2204
2205     return &s->sample_spec;
2206 }
2207
2208 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2209     pa_assert(s);
2210     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2211
2212     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2213
2214     return &s->channel_map;
2215 }
2216
2217 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2218     pa_assert(s);
2219     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2220
2221     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2222     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2223     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2224
2225     return &s->buffer_attr;
2226 }
2227
2228 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2229     pa_operation *o = userdata;
2230     int success = 1;
2231
2232     pa_assert(pd);
2233     pa_assert(o);
2234     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2235
2236     if (!o->context)
2237         goto finish;
2238
2239     if (command != PA_COMMAND_REPLY) {
2240         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2241             goto finish;
2242
2243         success = 0;
2244     } else {
2245         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2246             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2247                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2248                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2249                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2250                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2251                 goto finish;
2252             }
2253         } else if (o->stream->direction == PA_STREAM_RECORD) {
2254             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2255                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2256                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2257                 goto finish;
2258             }
2259         }
2260
2261         if (o->stream->context->version >= 13) {
2262             pa_usec_t usec;
2263
2264             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2265                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2266                 goto finish;
2267             }
2268
2269             if (o->stream->direction == PA_STREAM_RECORD)
2270                 o->stream->timing_info.configured_source_usec = usec;
2271             else
2272                 o->stream->timing_info.configured_sink_usec = usec;
2273         }
2274
2275         if (!pa_tagstruct_eof(t)) {
2276             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2277             goto finish;
2278         }
2279     }
2280
2281     if (o->callback) {
2282         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2283         cb(o->stream, success, o->userdata);
2284     }
2285
2286 finish:
2287     pa_operation_done(o);
2288     pa_operation_unref(o);
2289 }
2290
2291
2292 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2293     pa_operation *o;
2294     pa_tagstruct *t;
2295     uint32_t tag;
2296
2297     pa_assert(s);
2298     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2299     pa_assert(attr);
2300
2301     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2302     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2303     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2304     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2305
2306     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2307
2308     t = pa_tagstruct_command(
2309             s->context,
2310             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2311             &tag);
2312     pa_tagstruct_putu32(t, s->channel);
2313
2314     pa_tagstruct_putu32(t, attr->maxlength);
2315
2316     if (s->direction == PA_STREAM_PLAYBACK)
2317         pa_tagstruct_put(
2318                 t,
2319                 PA_TAG_U32, attr->tlength,
2320                 PA_TAG_U32, attr->prebuf,
2321                 PA_TAG_U32, attr->minreq,
2322                 PA_TAG_INVALID);
2323     else
2324         pa_tagstruct_putu32(t, attr->fragsize);
2325
2326     if (s->context->version >= 13)
2327         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2328
2329     if (s->context->version >= 14)
2330         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2331
2332     pa_pstream_send_tagstruct(s->context->pstream, t);
2333     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2334
2335     /* This might cause changes in the read/write indexex, hence let's
2336      * request a timing update */
2337     request_auto_timing_update(s, TRUE);
2338
2339     return o;
2340 }
2341
2342 uint32_t pa_stream_get_device_index(pa_stream *s) {
2343     pa_assert(s);
2344     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2345
2346     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2347     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2348     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2349     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2350     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2351
2352     return s->device_index;
2353 }
2354
2355 const char *pa_stream_get_device_name(pa_stream *s) {
2356     pa_assert(s);
2357     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2358
2359     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2360     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2361     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2362     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2363     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2364
2365     return s->device_name;
2366 }
2367
2368 int pa_stream_is_suspended(pa_stream *s) {
2369     pa_assert(s);
2370     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2371
2372     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2373     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2374     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2375     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2376
2377     return s->suspended;
2378 }
2379
2380 int pa_stream_is_corked(pa_stream *s) {
2381     pa_assert(s);
2382     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2383
2384     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2385     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2386     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2387
2388     return s->corked;
2389 }
2390
2391 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2392     pa_operation *o = userdata;
2393     int success = 1;
2394
2395     pa_assert(pd);
2396     pa_assert(o);
2397     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2398
2399     if (!o->context)
2400         goto finish;
2401
2402     if (command != PA_COMMAND_REPLY) {
2403         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2404             goto finish;
2405
2406         success = 0;
2407     } else {
2408
2409         if (!pa_tagstruct_eof(t)) {
2410             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2411             goto finish;
2412         }
2413     }
2414
2415     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2416     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2417
2418     if (o->callback) {
2419         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2420         cb(o->stream, success, o->userdata);
2421     }
2422
2423 finish:
2424     pa_operation_done(o);
2425     pa_operation_unref(o);
2426 }
2427
2428
2429 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2430     pa_operation *o;
2431     pa_tagstruct *t;
2432     uint32_t tag;
2433
2434     pa_assert(s);
2435     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2436
2437     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2438     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2439     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2440     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2441     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2442     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2443
2444     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2445     o->private = PA_UINT_TO_PTR(rate);
2446
2447     t = pa_tagstruct_command(
2448             s->context,
2449             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2450             &tag);
2451     pa_tagstruct_putu32(t, s->channel);
2452     pa_tagstruct_putu32(t, rate);
2453
2454     pa_pstream_send_tagstruct(s->context->pstream, t);
2455     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2456
2457     return o;
2458 }
2459
2460 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2461     pa_operation *o;
2462     pa_tagstruct *t;
2463     uint32_t tag;
2464
2465     pa_assert(s);
2466     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2467
2468     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2469     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2470     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2471     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2472     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2473
2474     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2475
2476     t = pa_tagstruct_command(
2477             s->context,
2478             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2479             &tag);
2480     pa_tagstruct_putu32(t, s->channel);
2481     pa_tagstruct_putu32(t, (uint32_t) mode);
2482     pa_tagstruct_put_proplist(t, p);
2483
2484     pa_pstream_send_tagstruct(s->context->pstream, t);
2485     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2486
2487     /* Please note that we don't update s->proplist here, because we
2488      * don't export that field */
2489
2490     return o;
2491 }
2492
2493 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2494     pa_operation *o;
2495     pa_tagstruct *t;
2496     uint32_t tag;
2497     const char * const*k;
2498
2499     pa_assert(s);
2500     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2501
2502     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2503     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2504     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2505     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2506     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2507
2508     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2509
2510     t = pa_tagstruct_command(
2511             s->context,
2512             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2513             &tag);
2514     pa_tagstruct_putu32(t, s->channel);
2515
2516     for (k = keys; *k; k++)
2517         pa_tagstruct_puts(t, *k);
2518
2519     pa_tagstruct_puts(t, NULL);
2520
2521     pa_pstream_send_tagstruct(s->context->pstream, t);
2522     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2523
2524     /* Please note that we don't update s->proplist here, because we
2525      * don't export that field */
2526
2527     return o;
2528 }
2529
2530 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2531     pa_assert(s);
2532     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2533
2534     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2535     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2536     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2537     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2538
2539     s->direct_on_input = sink_input_idx;
2540
2541     return 0;
2542 }
2543
2544 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2545     pa_assert(s);
2546     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2547
2548     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2549     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2550     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2551
2552     return s->direct_on_input;
2553 }