Merge branch 'master' of git://0pointer.de/pulseaudio into dbus-work
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57     s->read_callback = NULL;
58     s->read_userdata = NULL;
59     s->write_callback = NULL;
60     s->write_userdata = NULL;
61     s->state_callback = NULL;
62     s->state_userdata = NULL;
63     s->overflow_callback = NULL;
64     s->overflow_userdata = NULL;
65     s->underflow_callback = NULL;
66     s->underflow_userdata = NULL;
67     s->latency_update_callback = NULL;
68     s->latency_update_userdata = NULL;
69     s->moved_callback = NULL;
70     s->moved_userdata = NULL;
71     s->suspended_callback = NULL;
72     s->suspended_userdata = NULL;
73     s->started_callback = NULL;
74     s->started_userdata = NULL;
75     s->event_callback = NULL;
76     s->event_userdata = NULL;
77     s->buffer_attr_callback = NULL;
78     s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82         pa_context *c,
83         const char *name,
84         const pa_sample_spec *ss,
85         const pa_channel_map *map,
86         pa_proplist *p) {
87
88     pa_stream *s;
89     int i;
90     pa_channel_map tmap;
91
92     pa_assert(c);
93     pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     if (!map)
104         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114
115     s->sample_spec = *ss;
116     s->channel_map = *map;
117
118     s->direct_on_input = PA_INVALID_INDEX;
119
120     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121     if (name)
122         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124     s->channel = 0;
125     s->channel_valid = FALSE;
126     s->syncid = c->csyncid++;
127     s->stream_index = PA_INVALID_INDEX;
128
129     s->requested_bytes = 0;
130     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132     /* We initialize der target length here, so that if the user
133      * passes no explicit buffering metrics the default is similar to
134      * what older PA versions provided. */
135
136     s->buffer_attr.maxlength = (uint32_t) -1;
137     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138     s->buffer_attr.minreq = (uint32_t) -1;
139     s->buffer_attr.prebuf = (uint32_t) -1;
140     s->buffer_attr.fragsize = (uint32_t) -1;
141
142     s->device_index = PA_INVALID_INDEX;
143     s->device_name = NULL;
144     s->suspended = FALSE;
145     s->corked = FALSE;
146
147     s->write_memblock = NULL;
148     s->write_data = NULL;
149
150     pa_memchunk_reset(&s->peek_memchunk);
151     s->peek_data = NULL;
152     s->record_memblockq = NULL;
153
154     memset(&s->timing_info, 0, sizeof(s->timing_info));
155     s->timing_info_valid = FALSE;
156
157     s->previous_time = 0;
158
159     s->read_index_not_before = 0;
160     s->write_index_not_before = 0;
161     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162         s->write_index_corrections[i].valid = 0;
163     s->current_write_index_correction = 0;
164
165     s->auto_timing_update_event = NULL;
166     s->auto_timing_update_requested = FALSE;
167     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
168
169     reset_callbacks(s);
170
171     s->smoother = NULL;
172
173     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174     PA_LLIST_PREPEND(pa_stream, c->streams, s);
175     pa_stream_ref(s);
176
177     return s;
178 }
179
180 static void stream_unlink(pa_stream *s) {
181     pa_operation *o, *n;
182     pa_assert(s);
183
184     if (!s->context)
185         return;
186
187     /* Detach from context */
188
189     /* Unref all operatio object that point to us */
190     for (o = s->context->operations; o; o = n) {
191         n = o->next;
192
193         if (o->stream == s)
194             pa_operation_cancel(o);
195     }
196
197     /* Drop all outstanding replies for this stream */
198     if (s->context->pdispatch)
199         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200
201     if (s->channel_valid) {
202         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203         s->channel = 0;
204         s->channel_valid = FALSE;
205     }
206
207     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208     pa_stream_unref(s);
209
210     s->context = NULL;
211
212     if (s->auto_timing_update_event) {
213         pa_assert(s->mainloop);
214         s->mainloop->time_free(s->auto_timing_update_event);
215     }
216
217     reset_callbacks(s);
218 }
219
220 static void stream_free(pa_stream *s) {
221     pa_assert(s);
222
223     stream_unlink(s);
224
225     if (s->write_memblock) {
226         pa_memblock_release(s->write_memblock);
227         pa_memblock_unref(s->write_data);
228     }
229
230     if (s->peek_memchunk.memblock) {
231         if (s->peek_data)
232             pa_memblock_release(s->peek_memchunk.memblock);
233         pa_memblock_unref(s->peek_memchunk.memblock);
234     }
235
236     if (s->record_memblockq)
237         pa_memblockq_free(s->record_memblockq);
238
239     if (s->proplist)
240         pa_proplist_free(s->proplist);
241
242     if (s->smoother)
243         pa_smoother_free(s->smoother);
244
245     pa_xfree(s->device_name);
246     pa_xfree(s);
247 }
248
249 void pa_stream_unref(pa_stream *s) {
250     pa_assert(s);
251     pa_assert(PA_REFCNT_VALUE(s) >= 1);
252
253     if (PA_REFCNT_DEC(s) <= 0)
254         stream_free(s);
255 }
256
257 pa_stream* pa_stream_ref(pa_stream *s) {
258     pa_assert(s);
259     pa_assert(PA_REFCNT_VALUE(s) >= 1);
260
261     PA_REFCNT_INC(s);
262     return s;
263 }
264
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->state;
270 }
271
272 pa_context* pa_stream_get_context(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     return s->context;
277 }
278
279 uint32_t pa_stream_get_index(pa_stream *s) {
280     pa_assert(s);
281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
282
283     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
285
286     return s->stream_index;
287 }
288
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290     pa_assert(s);
291     pa_assert(PA_REFCNT_VALUE(s) >= 1);
292
293     if (s->state == st)
294         return;
295
296     pa_stream_ref(s);
297
298     s->state = st;
299
300     if (s->state_callback)
301         s->state_callback(s, s->state_userdata);
302
303     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304         stream_unlink(s);
305
306     pa_stream_unref(s);
307 }
308
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310     pa_assert(s);
311     pa_assert(PA_REFCNT_VALUE(s) >= 1);
312
313     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314         return;
315
316     if (s->state == PA_STREAM_READY &&
317         (force || !s->auto_timing_update_requested)) {
318         pa_operation *o;
319
320 /*         pa_log("Automatically requesting new timing data"); */
321
322         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323             pa_operation_unref(o);
324             s->auto_timing_update_requested = TRUE;
325         }
326     }
327
328     if (s->auto_timing_update_event) {
329         if (force)
330             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
331
332         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
333
334         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335     }
336 }
337
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339     pa_context *c = userdata;
340     pa_stream *s;
341     uint32_t channel;
342
343     pa_assert(pd);
344     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345     pa_assert(t);
346     pa_assert(c);
347     pa_assert(PA_REFCNT_VALUE(c) >= 1);
348
349     pa_context_ref(c);
350
351     if (pa_tagstruct_getu32(t, &channel) < 0 ||
352         !pa_tagstruct_eof(t)) {
353         pa_context_fail(c, PA_ERR_PROTOCOL);
354         goto finish;
355     }
356
357     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358         goto finish;
359
360     if (s->state != PA_STREAM_READY)
361         goto finish;
362
363     pa_context_set_error(c, PA_ERR_KILLED);
364     pa_stream_set_state(s, PA_STREAM_FAILED);
365
366 finish:
367     pa_context_unref(c);
368 }
369
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371     pa_usec_t x;
372
373     pa_assert(s);
374     pa_assert(!force_start || !force_stop);
375
376     if (!s->smoother)
377         return;
378
379     x = pa_rtclock_now();
380
381     if (s->timing_info_valid) {
382         if (aposteriori)
383             x -= s->timing_info.transport_usec;
384         else
385             x += s->timing_info.transport_usec;
386     }
387
388     if (s->suspended || s->corked || force_stop)
389         pa_smoother_pause(s->smoother, x);
390     else if (force_start || s->buffer_attr.prebuf == 0)
391         pa_smoother_resume(s->smoother, x, TRUE);
392
393
394     /* Please note that we have no idea if playback actually started
395      * if prebuf is non-zero! */
396 }
397
398 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
399     pa_context *c = userdata;
400     pa_stream *s;
401     uint32_t channel;
402     const char *dn;
403     pa_bool_t suspended;
404     uint32_t di;
405     pa_usec_t usec = 0;
406     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
407
408     pa_assert(pd);
409     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
410     pa_assert(t);
411     pa_assert(c);
412     pa_assert(PA_REFCNT_VALUE(c) >= 1);
413
414     pa_context_ref(c);
415
416     if (c->version < 12) {
417         pa_context_fail(c, PA_ERR_PROTOCOL);
418         goto finish;
419     }
420
421     if (pa_tagstruct_getu32(t, &channel) < 0 ||
422         pa_tagstruct_getu32(t, &di) < 0 ||
423         pa_tagstruct_gets(t, &dn) < 0 ||
424         pa_tagstruct_get_boolean(t, &suspended) < 0) {
425         pa_context_fail(c, PA_ERR_PROTOCOL);
426         goto finish;
427     }
428
429     if (c->version >= 13) {
430
431         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
432             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
433                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
434                 pa_tagstruct_get_usec(t, &usec) < 0) {
435                 pa_context_fail(c, PA_ERR_PROTOCOL);
436                 goto finish;
437             }
438         } else {
439             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
440                 pa_tagstruct_getu32(t, &tlength) < 0 ||
441                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
442                 pa_tagstruct_getu32(t, &minreq) < 0 ||
443                 pa_tagstruct_get_usec(t, &usec) < 0) {
444                 pa_context_fail(c, PA_ERR_PROTOCOL);
445                 goto finish;
446             }
447         }
448     }
449
450     if (!pa_tagstruct_eof(t)) {
451         pa_context_fail(c, PA_ERR_PROTOCOL);
452         goto finish;
453     }
454
455     if (!dn || di == PA_INVALID_INDEX) {
456         pa_context_fail(c, PA_ERR_PROTOCOL);
457         goto finish;
458     }
459
460     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
461         goto finish;
462
463     if (s->state != PA_STREAM_READY)
464         goto finish;
465
466     if (c->version >= 13) {
467         if (s->direction == PA_STREAM_RECORD)
468             s->timing_info.configured_source_usec = usec;
469         else
470             s->timing_info.configured_sink_usec = usec;
471
472         s->buffer_attr.maxlength = maxlength;
473         s->buffer_attr.fragsize = fragsize;
474         s->buffer_attr.tlength = tlength;
475         s->buffer_attr.prebuf = prebuf;
476         s->buffer_attr.minreq = minreq;
477     }
478
479     pa_xfree(s->device_name);
480     s->device_name = pa_xstrdup(dn);
481     s->device_index = di;
482
483     s->suspended = suspended;
484
485     check_smoother_status(s, TRUE, FALSE, FALSE);
486     request_auto_timing_update(s, TRUE);
487
488     if (s->moved_callback)
489         s->moved_callback(s, s->moved_userdata);
490
491 finish:
492     pa_context_unref(c);
493 }
494
495 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
496     pa_context *c = userdata;
497     pa_stream *s;
498     uint32_t channel;
499     pa_usec_t usec = 0;
500     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
501
502     pa_assert(pd);
503     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
504     pa_assert(t);
505     pa_assert(c);
506     pa_assert(PA_REFCNT_VALUE(c) >= 1);
507
508     pa_context_ref(c);
509
510     if (c->version < 15) {
511         pa_context_fail(c, PA_ERR_PROTOCOL);
512         goto finish;
513     }
514
515     if (pa_tagstruct_getu32(t, &channel) < 0) {
516         pa_context_fail(c, PA_ERR_PROTOCOL);
517         goto finish;
518     }
519
520     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
521         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
522             pa_tagstruct_getu32(t, &fragsize) < 0 ||
523             pa_tagstruct_get_usec(t, &usec) < 0) {
524             pa_context_fail(c, PA_ERR_PROTOCOL);
525             goto finish;
526         }
527     } else {
528         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
529             pa_tagstruct_getu32(t, &tlength) < 0 ||
530             pa_tagstruct_getu32(t, &prebuf) < 0 ||
531             pa_tagstruct_getu32(t, &minreq) < 0 ||
532             pa_tagstruct_get_usec(t, &usec) < 0) {
533             pa_context_fail(c, PA_ERR_PROTOCOL);
534             goto finish;
535         }
536     }
537
538     if (!pa_tagstruct_eof(t)) {
539         pa_context_fail(c, PA_ERR_PROTOCOL);
540         goto finish;
541     }
542
543     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
544         goto finish;
545
546     if (s->state != PA_STREAM_READY)
547         goto finish;
548
549     if (s->direction == PA_STREAM_RECORD)
550         s->timing_info.configured_source_usec = usec;
551     else
552         s->timing_info.configured_sink_usec = usec;
553
554     s->buffer_attr.maxlength = maxlength;
555     s->buffer_attr.fragsize = fragsize;
556     s->buffer_attr.tlength = tlength;
557     s->buffer_attr.prebuf = prebuf;
558     s->buffer_attr.minreq = minreq;
559
560     request_auto_timing_update(s, TRUE);
561
562     if (s->buffer_attr_callback)
563         s->buffer_attr_callback(s, s->buffer_attr_userdata);
564
565 finish:
566     pa_context_unref(c);
567 }
568
569 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
570     pa_context *c = userdata;
571     pa_stream *s;
572     uint32_t channel;
573     pa_bool_t suspended;
574
575     pa_assert(pd);
576     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
577     pa_assert(t);
578     pa_assert(c);
579     pa_assert(PA_REFCNT_VALUE(c) >= 1);
580
581     pa_context_ref(c);
582
583     if (c->version < 12) {
584         pa_context_fail(c, PA_ERR_PROTOCOL);
585         goto finish;
586     }
587
588     if (pa_tagstruct_getu32(t, &channel) < 0 ||
589         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
590         !pa_tagstruct_eof(t)) {
591         pa_context_fail(c, PA_ERR_PROTOCOL);
592         goto finish;
593     }
594
595     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
596         goto finish;
597
598     if (s->state != PA_STREAM_READY)
599         goto finish;
600
601     s->suspended = suspended;
602
603     check_smoother_status(s, TRUE, FALSE, FALSE);
604     request_auto_timing_update(s, TRUE);
605
606     if (s->suspended_callback)
607         s->suspended_callback(s, s->suspended_userdata);
608
609 finish:
610     pa_context_unref(c);
611 }
612
613 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
614     pa_context *c = userdata;
615     pa_stream *s;
616     uint32_t channel;
617
618     pa_assert(pd);
619     pa_assert(command == PA_COMMAND_STARTED);
620     pa_assert(t);
621     pa_assert(c);
622     pa_assert(PA_REFCNT_VALUE(c) >= 1);
623
624     pa_context_ref(c);
625
626     if (c->version < 13) {
627         pa_context_fail(c, PA_ERR_PROTOCOL);
628         goto finish;
629     }
630
631     if (pa_tagstruct_getu32(t, &channel) < 0 ||
632         !pa_tagstruct_eof(t)) {
633         pa_context_fail(c, PA_ERR_PROTOCOL);
634         goto finish;
635     }
636
637     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
638         goto finish;
639
640     if (s->state != PA_STREAM_READY)
641         goto finish;
642
643     check_smoother_status(s, TRUE, TRUE, FALSE);
644     request_auto_timing_update(s, TRUE);
645
646     if (s->started_callback)
647         s->started_callback(s, s->started_userdata);
648
649 finish:
650     pa_context_unref(c);
651 }
652
653 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
654     pa_context *c = userdata;
655     pa_stream *s;
656     uint32_t channel;
657     pa_proplist *pl = NULL;
658     const char *event;
659
660     pa_assert(pd);
661     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
662     pa_assert(t);
663     pa_assert(c);
664     pa_assert(PA_REFCNT_VALUE(c) >= 1);
665
666     pa_context_ref(c);
667
668     if (c->version < 15) {
669         pa_context_fail(c, PA_ERR_PROTOCOL);
670         goto finish;
671     }
672
673     pl = pa_proplist_new();
674
675     if (pa_tagstruct_getu32(t, &channel) < 0 ||
676         pa_tagstruct_gets(t, &event) < 0 ||
677         pa_tagstruct_get_proplist(t, pl) < 0 ||
678         !pa_tagstruct_eof(t) || !event) {
679         pa_context_fail(c, PA_ERR_PROTOCOL);
680         goto finish;
681     }
682
683     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
684         goto finish;
685
686     if (s->state != PA_STREAM_READY)
687         goto finish;
688
689     if (s->event_callback)
690         s->event_callback(s, event, pl, s->event_userdata);
691
692 finish:
693     pa_context_unref(c);
694
695     if (pl)
696         pa_proplist_free(pl);
697 }
698
699 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
700     pa_stream *s;
701     pa_context *c = userdata;
702     uint32_t bytes, channel;
703
704     pa_assert(pd);
705     pa_assert(command == PA_COMMAND_REQUEST);
706     pa_assert(t);
707     pa_assert(c);
708     pa_assert(PA_REFCNT_VALUE(c) >= 1);
709
710     pa_context_ref(c);
711
712     if (pa_tagstruct_getu32(t, &channel) < 0 ||
713         pa_tagstruct_getu32(t, &bytes) < 0 ||
714         !pa_tagstruct_eof(t)) {
715         pa_context_fail(c, PA_ERR_PROTOCOL);
716         goto finish;
717     }
718
719     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
720         goto finish;
721
722     if (s->state != PA_STREAM_READY)
723         goto finish;
724
725     s->requested_bytes += bytes;
726
727     if (s->requested_bytes > 0 && s->write_callback)
728         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
729
730 finish:
731     pa_context_unref(c);
732 }
733
734 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
735     pa_stream *s;
736     pa_context *c = userdata;
737     uint32_t channel;
738
739     pa_assert(pd);
740     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
741     pa_assert(t);
742     pa_assert(c);
743     pa_assert(PA_REFCNT_VALUE(c) >= 1);
744
745     pa_context_ref(c);
746
747     if (pa_tagstruct_getu32(t, &channel) < 0 ||
748         !pa_tagstruct_eof(t)) {
749         pa_context_fail(c, PA_ERR_PROTOCOL);
750         goto finish;
751     }
752
753     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
754         goto finish;
755
756     if (s->state != PA_STREAM_READY)
757         goto finish;
758
759     if (s->buffer_attr.prebuf > 0)
760         check_smoother_status(s, TRUE, FALSE, TRUE);
761
762     request_auto_timing_update(s, TRUE);
763
764     if (command == PA_COMMAND_OVERFLOW) {
765         if (s->overflow_callback)
766             s->overflow_callback(s, s->overflow_userdata);
767     } else if (command == PA_COMMAND_UNDERFLOW) {
768         if (s->underflow_callback)
769             s->underflow_callback(s, s->underflow_userdata);
770     }
771
772  finish:
773     pa_context_unref(c);
774 }
775
776 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
777     pa_assert(s);
778     pa_assert(PA_REFCNT_VALUE(s) >= 1);
779
780 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
781
782     if (s->state != PA_STREAM_READY)
783         return;
784
785     if (w) {
786         s->write_index_not_before = s->context->ctag;
787
788         if (s->timing_info_valid)
789             s->timing_info.write_index_corrupt = TRUE;
790
791 /*         pa_log("write_index invalidated"); */
792     }
793
794     if (r) {
795         s->read_index_not_before = s->context->ctag;
796
797         if (s->timing_info_valid)
798             s->timing_info.read_index_corrupt = TRUE;
799
800 /*         pa_log("read_index invalidated"); */
801     }
802
803     request_auto_timing_update(s, TRUE);
804 }
805
806 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
807     pa_stream *s = userdata;
808
809     pa_assert(s);
810     pa_assert(PA_REFCNT_VALUE(s) >= 1);
811
812     pa_stream_ref(s);
813     request_auto_timing_update(s, FALSE);
814     pa_stream_unref(s);
815 }
816
817 static void create_stream_complete(pa_stream *s) {
818     pa_assert(s);
819     pa_assert(PA_REFCNT_VALUE(s) >= 1);
820     pa_assert(s->state == PA_STREAM_CREATING);
821
822     pa_stream_set_state(s, PA_STREAM_READY);
823
824     if (s->requested_bytes > 0 && s->write_callback)
825         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
826
827     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
828         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
829         pa_assert(!s->auto_timing_update_event);
830         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
831
832         request_auto_timing_update(s, TRUE);
833     }
834
835     check_smoother_status(s, TRUE, FALSE, FALSE);
836 }
837
838 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
839     pa_assert(s);
840     pa_assert(attr);
841     pa_assert(ss);
842
843     if (s->context->version >= 13)
844         return;
845
846     /* Version older than 0.9.10 didn't do server side buffer_attr
847      * selection, hence we have to fake it on the client side. */
848
849     /* We choose fairly conservative values here, to not confuse
850      * old clients with extremely large playback buffers */
851
852     if (attr->maxlength == (uint32_t) -1)
853         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
854
855     if (attr->tlength == (uint32_t) -1)
856         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
857
858     if (attr->minreq == (uint32_t) -1)
859         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
860
861     if (attr->prebuf == (uint32_t) -1)
862         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
863
864     if (attr->fragsize == (uint32_t) -1)
865         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
866 }
867
868 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
869     pa_stream *s = userdata;
870     uint32_t requested_bytes = 0;
871
872     pa_assert(pd);
873     pa_assert(s);
874     pa_assert(PA_REFCNT_VALUE(s) >= 1);
875     pa_assert(s->state == PA_STREAM_CREATING);
876
877     pa_stream_ref(s);
878
879     if (command != PA_COMMAND_REPLY) {
880         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
881             goto finish;
882
883         pa_stream_set_state(s, PA_STREAM_FAILED);
884         goto finish;
885     }
886
887     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
888         s->channel == PA_INVALID_INDEX ||
889         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
890         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
891         pa_context_fail(s->context, PA_ERR_PROTOCOL);
892         goto finish;
893     }
894
895     s->requested_bytes = (int64_t) requested_bytes;
896
897     if (s->context->version >= 9) {
898         if (s->direction == PA_STREAM_PLAYBACK) {
899             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
900                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
901                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
902                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
903                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
904                 goto finish;
905             }
906         } else if (s->direction == PA_STREAM_RECORD) {
907             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
909                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
910                 goto finish;
911             }
912         }
913     }
914
915     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
916         pa_sample_spec ss;
917         pa_channel_map cm;
918         const char *dn = NULL;
919         pa_bool_t suspended;
920
921         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
922             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
923             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
924             pa_tagstruct_gets(t, &dn) < 0 ||
925             pa_tagstruct_get_boolean(t, &suspended) < 0) {
926             pa_context_fail(s->context, PA_ERR_PROTOCOL);
927             goto finish;
928         }
929
930         if (!dn || s->device_index == PA_INVALID_INDEX ||
931             ss.channels != cm.channels ||
932             !pa_channel_map_valid(&cm) ||
933             !pa_sample_spec_valid(&ss) ||
934             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
935             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
936             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
937             pa_context_fail(s->context, PA_ERR_PROTOCOL);
938             goto finish;
939         }
940
941         pa_xfree(s->device_name);
942         s->device_name = pa_xstrdup(dn);
943         s->suspended = suspended;
944
945         s->channel_map = cm;
946         s->sample_spec = ss;
947     }
948
949     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
950         pa_usec_t usec;
951
952         if (pa_tagstruct_get_usec(t, &usec) < 0) {
953             pa_context_fail(s->context, PA_ERR_PROTOCOL);
954             goto finish;
955         }
956
957         if (s->direction == PA_STREAM_RECORD)
958             s->timing_info.configured_source_usec = usec;
959         else
960             s->timing_info.configured_sink_usec = usec;
961     }
962
963     if (!pa_tagstruct_eof(t)) {
964         pa_context_fail(s->context, PA_ERR_PROTOCOL);
965         goto finish;
966     }
967
968     if (s->direction == PA_STREAM_RECORD) {
969         pa_assert(!s->record_memblockq);
970
971         s->record_memblockq = pa_memblockq_new(
972                 0,
973                 s->buffer_attr.maxlength,
974                 0,
975                 pa_frame_size(&s->sample_spec),
976                 1,
977                 0,
978                 0,
979                 NULL);
980     }
981
982     s->channel_valid = TRUE;
983     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
984
985     create_stream_complete(s);
986
987 finish:
988     pa_stream_unref(s);
989 }
990
991 static int create_stream(
992         pa_stream_direction_t direction,
993         pa_stream *s,
994         const char *dev,
995         const pa_buffer_attr *attr,
996         pa_stream_flags_t flags,
997         const pa_cvolume *volume,
998         pa_stream *sync_stream) {
999
1000     pa_tagstruct *t;
1001     uint32_t tag;
1002     pa_bool_t volume_set = FALSE;
1003
1004     pa_assert(s);
1005     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1007
1008     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1009     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1010     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1011     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1012                                               PA_STREAM_INTERPOLATE_TIMING|
1013                                               PA_STREAM_NOT_MONOTONIC|
1014                                               PA_STREAM_AUTO_TIMING_UPDATE|
1015                                               PA_STREAM_NO_REMAP_CHANNELS|
1016                                               PA_STREAM_NO_REMIX_CHANNELS|
1017                                               PA_STREAM_FIX_FORMAT|
1018                                               PA_STREAM_FIX_RATE|
1019                                               PA_STREAM_FIX_CHANNELS|
1020                                               PA_STREAM_DONT_MOVE|
1021                                               PA_STREAM_VARIABLE_RATE|
1022                                               PA_STREAM_PEAK_DETECT|
1023                                               PA_STREAM_START_MUTED|
1024                                               PA_STREAM_ADJUST_LATENCY|
1025                                               PA_STREAM_EARLY_REQUESTS|
1026                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1027                                               PA_STREAM_START_UNMUTED|
1028                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1029
1030     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1031     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1032     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1033     /* Althought some of the other flags are not supported on older
1034      * version, we don't check for them here, because it doesn't hurt
1035      * when they are passed but actually not supported. This makes
1036      * client development easier */
1037
1038     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1039     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1040     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1041     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1042     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1043
1044     pa_stream_ref(s);
1045
1046     s->direction = direction;
1047     s->flags = flags;
1048     s->corked = !!(flags & PA_STREAM_START_CORKED);
1049
1050     if (sync_stream)
1051         s->syncid = sync_stream->syncid;
1052
1053     if (attr)
1054         s->buffer_attr = *attr;
1055     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1056
1057     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1058         pa_usec_t x;
1059
1060         x = pa_rtclock_now();
1061
1062         pa_assert(!s->smoother);
1063         s->smoother = pa_smoother_new(
1064                 SMOOTHER_ADJUST_TIME,
1065                 SMOOTHER_HISTORY_TIME,
1066                 !(flags & PA_STREAM_NOT_MONOTONIC),
1067                 TRUE,
1068                 SMOOTHER_MIN_HISTORY,
1069                 x,
1070                 TRUE);
1071     }
1072
1073     if (!dev)
1074         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1075
1076     t = pa_tagstruct_command(
1077             s->context,
1078             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1079             &tag);
1080
1081     if (s->context->version < 13)
1082         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1083
1084     pa_tagstruct_put(
1085             t,
1086             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1087             PA_TAG_CHANNEL_MAP, &s->channel_map,
1088             PA_TAG_U32, PA_INVALID_INDEX,
1089             PA_TAG_STRING, dev,
1090             PA_TAG_U32, s->buffer_attr.maxlength,
1091             PA_TAG_BOOLEAN, s->corked,
1092             PA_TAG_INVALID);
1093
1094     if (s->direction == PA_STREAM_PLAYBACK) {
1095         pa_cvolume cv;
1096
1097         pa_tagstruct_put(
1098                 t,
1099                 PA_TAG_U32, s->buffer_attr.tlength,
1100                 PA_TAG_U32, s->buffer_attr.prebuf,
1101                 PA_TAG_U32, s->buffer_attr.minreq,
1102                 PA_TAG_U32, s->syncid,
1103                 PA_TAG_INVALID);
1104
1105         volume_set = !!volume;
1106
1107         if (!volume)
1108             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1109
1110         pa_tagstruct_put_cvolume(t, volume);
1111     } else
1112         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1113
1114     if (s->context->version >= 12) {
1115         pa_tagstruct_put(
1116                 t,
1117                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1118                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1119                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1120                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1121                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1122                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1123                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1124                 PA_TAG_INVALID);
1125     }
1126
1127     if (s->context->version >= 13) {
1128
1129         if (s->direction == PA_STREAM_PLAYBACK)
1130             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1131         else
1132             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1133
1134         pa_tagstruct_put(
1135                 t,
1136                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1137                 PA_TAG_PROPLIST, s->proplist,
1138                 PA_TAG_INVALID);
1139
1140         if (s->direction == PA_STREAM_RECORD)
1141             pa_tagstruct_putu32(t, s->direct_on_input);
1142     }
1143
1144     if (s->context->version >= 14) {
1145
1146         if (s->direction == PA_STREAM_PLAYBACK)
1147             pa_tagstruct_put_boolean(t, volume_set);
1148
1149         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1150     }
1151
1152     if (s->context->version >= 15) {
1153
1154         if (s->direction == PA_STREAM_PLAYBACK)
1155             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1156
1157         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1158         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1159     }
1160
1161     pa_pstream_send_tagstruct(s->context->pstream, t);
1162     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1163
1164     pa_stream_set_state(s, PA_STREAM_CREATING);
1165
1166     pa_stream_unref(s);
1167     return 0;
1168 }
1169
1170 int pa_stream_connect_playback(
1171         pa_stream *s,
1172         const char *dev,
1173         const pa_buffer_attr *attr,
1174         pa_stream_flags_t flags,
1175         const pa_cvolume *volume,
1176         pa_stream *sync_stream) {
1177
1178     pa_assert(s);
1179     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1180
1181     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1182 }
1183
1184 int pa_stream_connect_record(
1185         pa_stream *s,
1186         const char *dev,
1187         const pa_buffer_attr *attr,
1188         pa_stream_flags_t flags) {
1189
1190     pa_assert(s);
1191     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1192
1193     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1194 }
1195
1196 int pa_stream_begin_write(
1197         pa_stream *s,
1198         void **data,
1199         size_t *nbytes) {
1200
1201     pa_assert(s);
1202     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1203
1204     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1205     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1206     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1207     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1208     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1209
1210     if (*nbytes != (size_t) -1) {
1211         size_t m, fs;
1212
1213         m = pa_mempool_block_size_max(s->context->mempool);
1214         fs = pa_frame_size(&s->sample_spec);
1215
1216         m = (m / fs) * fs;
1217         if (*nbytes > m)
1218             *nbytes = m;
1219     }
1220
1221     if (!s->write_memblock) {
1222         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1223         s->write_data = pa_memblock_acquire(s->write_memblock);
1224     }
1225
1226     *data = s->write_data;
1227     *nbytes = pa_memblock_get_length(s->write_memblock);
1228
1229     return 0;
1230 }
1231
1232 int pa_stream_cancel_write(
1233         pa_stream *s) {
1234
1235     pa_assert(s);
1236     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1237
1238     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1239     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1240     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1241     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1242
1243     pa_assert(s->write_data);
1244
1245     pa_memblock_release(s->write_memblock);
1246     pa_memblock_unref(s->write_memblock);
1247     s->write_memblock = NULL;
1248     s->write_data = NULL;
1249
1250     return 0;
1251 }
1252
1253 int pa_stream_write(
1254         pa_stream *s,
1255         const void *data,
1256         size_t length,
1257         pa_free_cb_t free_cb,
1258         int64_t offset,
1259         pa_seek_mode_t seek) {
1260
1261     pa_assert(s);
1262     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1263     pa_assert(data);
1264
1265     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1266     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1267     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1268     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1269     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1270     PA_CHECK_VALIDITY(s->context,
1271                       !s->write_memblock ||
1272                       ((data >= s->write_data) &&
1273                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1274                       PA_ERR_INVALID);
1275     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1276
1277     if (s->write_memblock) {
1278         pa_memchunk chunk;
1279
1280         /* pa_stream_write_begin() was called before */
1281
1282         pa_memblock_release(s->write_memblock);
1283
1284         chunk.memblock = s->write_memblock;
1285         chunk.index = (const char *) data - (const char *) s->write_data;
1286         chunk.length = length;
1287
1288         s->write_memblock = NULL;
1289         s->write_data = NULL;
1290
1291         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1292         pa_memblock_unref(chunk.memblock);
1293
1294     } else {
1295         pa_seek_mode_t t_seek = seek;
1296         int64_t t_offset = offset;
1297         size_t t_length = length;
1298         const void *t_data = data;
1299
1300         /* pa_stream_write_begin() was not called before */
1301
1302         while (t_length > 0) {
1303             pa_memchunk chunk;
1304
1305             chunk.index = 0;
1306
1307             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1308                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1309                 chunk.length = t_length;
1310             } else {
1311                 void *d;
1312
1313                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1314                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1315
1316                 d = pa_memblock_acquire(chunk.memblock);
1317                 memcpy(d, t_data, chunk.length);
1318                 pa_memblock_release(chunk.memblock);
1319             }
1320
1321             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1322
1323             t_offset = 0;
1324             t_seek = PA_SEEK_RELATIVE;
1325
1326             t_data = (const uint8_t*) t_data + chunk.length;
1327             t_length -= chunk.length;
1328
1329             pa_memblock_unref(chunk.memblock);
1330         }
1331
1332         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1333             free_cb((void*) data);
1334     }
1335
1336     /* This is obviously wrong since we ignore the seeking index . But
1337      * that's OK, the server side applies the same error */
1338     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1339
1340     if (s->direction == PA_STREAM_PLAYBACK) {
1341
1342         /* Update latency request correction */
1343         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1344
1345             if (seek == PA_SEEK_ABSOLUTE) {
1346                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1347                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1348                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1349             } else if (seek == PA_SEEK_RELATIVE) {
1350                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1351                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1352             } else
1353                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1354         }
1355
1356         /* Update the write index in the already available latency data */
1357         if (s->timing_info_valid) {
1358
1359             if (seek == PA_SEEK_ABSOLUTE) {
1360                 s->timing_info.write_index_corrupt = FALSE;
1361                 s->timing_info.write_index = offset + (int64_t) length;
1362             } else if (seek == PA_SEEK_RELATIVE) {
1363                 if (!s->timing_info.write_index_corrupt)
1364                     s->timing_info.write_index += offset + (int64_t) length;
1365             } else
1366                 s->timing_info.write_index_corrupt = TRUE;
1367         }
1368
1369         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1370             request_auto_timing_update(s, TRUE);
1371     }
1372
1373     return 0;
1374 }
1375
1376 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1377     pa_assert(s);
1378     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1379     pa_assert(data);
1380     pa_assert(length);
1381
1382     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1383     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1384     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1385
1386     if (!s->peek_memchunk.memblock) {
1387
1388         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1389             *data = NULL;
1390             *length = 0;
1391             return 0;
1392         }
1393
1394         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1395     }
1396
1397     pa_assert(s->peek_data);
1398     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1399     *length = s->peek_memchunk.length;
1400     return 0;
1401 }
1402
1403 int pa_stream_drop(pa_stream *s) {
1404     pa_assert(s);
1405     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1406
1407     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1408     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1409     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1410     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1411
1412     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1413
1414     /* Fix the simulated local read index */
1415     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1416         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1417
1418     pa_assert(s->peek_data);
1419     pa_memblock_release(s->peek_memchunk.memblock);
1420     pa_memblock_unref(s->peek_memchunk.memblock);
1421     pa_memchunk_reset(&s->peek_memchunk);
1422
1423     return 0;
1424 }
1425
1426 size_t pa_stream_writable_size(pa_stream *s) {
1427     pa_assert(s);
1428     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1429
1430     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1431     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1432     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1433
1434     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1435 }
1436
1437 size_t pa_stream_readable_size(pa_stream *s) {
1438     pa_assert(s);
1439     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1440
1441     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1442     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1443     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1444
1445     return pa_memblockq_get_length(s->record_memblockq);
1446 }
1447
1448 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1449     pa_operation *o;
1450     pa_tagstruct *t;
1451     uint32_t tag;
1452
1453     pa_assert(s);
1454     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1455
1456     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1457     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1458     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1459
1460     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1461
1462     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1463     pa_tagstruct_putu32(t, s->channel);
1464     pa_pstream_send_tagstruct(s->context->pstream, t);
1465     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1466
1467     return o;
1468 }
1469
1470 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1471     pa_usec_t usec;
1472
1473     pa_assert(s);
1474     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1475     pa_assert(s->state == PA_STREAM_READY);
1476     pa_assert(s->direction != PA_STREAM_UPLOAD);
1477     pa_assert(s->timing_info_valid);
1478     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1479     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1480
1481     if (s->direction == PA_STREAM_PLAYBACK) {
1482         /* The last byte that was written into the output device
1483          * had this time value associated */
1484         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1485
1486         if (!s->corked && !s->suspended) {
1487
1488             if (!ignore_transport)
1489                 /* Because the latency info took a little time to come
1490                  * to us, we assume that the real output time is actually
1491                  * a little ahead */
1492                 usec += s->timing_info.transport_usec;
1493
1494             /* However, the output device usually maintains a buffer
1495                too, hence the real sample currently played is a little
1496                back  */
1497             if (s->timing_info.sink_usec >= usec)
1498                 usec = 0;
1499             else
1500                 usec -= s->timing_info.sink_usec;
1501         }
1502
1503     } else {
1504         pa_assert(s->direction == PA_STREAM_RECORD);
1505
1506         /* The last byte written into the server side queue had
1507          * this time value associated */
1508         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1509
1510         if (!s->corked && !s->suspended) {
1511
1512             if (!ignore_transport)
1513                 /* Add transport latency */
1514                 usec += s->timing_info.transport_usec;
1515
1516             /* Add latency of data in device buffer */
1517             usec += s->timing_info.source_usec;
1518
1519             /* If this is a monitor source, we need to correct the
1520              * time by the playback device buffer */
1521             if (s->timing_info.sink_usec >= usec)
1522                 usec = 0;
1523             else
1524                 usec -= s->timing_info.sink_usec;
1525         }
1526     }
1527
1528     return usec;
1529 }
1530
1531 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1532     pa_operation *o = userdata;
1533     struct timeval local, remote, now;
1534     pa_timing_info *i;
1535     pa_bool_t playing = FALSE;
1536     uint64_t underrun_for = 0, playing_for = 0;
1537
1538     pa_assert(pd);
1539     pa_assert(o);
1540     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1541
1542     if (!o->context || !o->stream)
1543         goto finish;
1544
1545     i = &o->stream->timing_info;
1546
1547     o->stream->timing_info_valid = FALSE;
1548     i->write_index_corrupt = TRUE;
1549     i->read_index_corrupt = TRUE;
1550
1551     if (command != PA_COMMAND_REPLY) {
1552         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1553             goto finish;
1554
1555     } else {
1556
1557         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1558             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1559             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1560             pa_tagstruct_get_timeval(t, &local) < 0 ||
1561             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1562             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1563             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1564
1565             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1566             goto finish;
1567         }
1568
1569         if (o->context->version >= 13 &&
1570             o->stream->direction == PA_STREAM_PLAYBACK)
1571             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1572                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1573
1574                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1575                 goto finish;
1576             }
1577
1578
1579         if (!pa_tagstruct_eof(t)) {
1580             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1581             goto finish;
1582         }
1583         o->stream->timing_info_valid = TRUE;
1584         i->write_index_corrupt = FALSE;
1585         i->read_index_corrupt = FALSE;
1586
1587         i->playing = (int) playing;
1588         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1589
1590         pa_gettimeofday(&now);
1591
1592         /* Calculcate timestamps */
1593         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1594             /* local and remote seem to have synchronized clocks */
1595
1596             if (o->stream->direction == PA_STREAM_PLAYBACK)
1597                 i->transport_usec = pa_timeval_diff(&remote, &local);
1598             else
1599                 i->transport_usec = pa_timeval_diff(&now, &remote);
1600
1601             i->synchronized_clocks = TRUE;
1602             i->timestamp = remote;
1603         } else {
1604             /* clocks are not synchronized, let's estimate latency then */
1605             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1606             i->synchronized_clocks = FALSE;
1607             i->timestamp = local;
1608             pa_timeval_add(&i->timestamp, i->transport_usec);
1609         }
1610
1611         /* Invalidate read and write indexes if necessary */
1612         if (tag < o->stream->read_index_not_before)
1613             i->read_index_corrupt = TRUE;
1614
1615         if (tag < o->stream->write_index_not_before)
1616             i->write_index_corrupt = TRUE;
1617
1618         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1619             /* Write index correction */
1620
1621             int n, j;
1622             uint32_t ctag = tag;
1623
1624             /* Go through the saved correction values and add up the
1625              * total correction.*/
1626             for (n = 0, j = o->stream->current_write_index_correction+1;
1627                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1628                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1629
1630                 /* Step over invalid data or out-of-date data */
1631                 if (!o->stream->write_index_corrections[j].valid ||
1632                     o->stream->write_index_corrections[j].tag < ctag)
1633                     continue;
1634
1635                 /* Make sure that everything is in order */
1636                 ctag = o->stream->write_index_corrections[j].tag+1;
1637
1638                 /* Now fix the write index */
1639                 if (o->stream->write_index_corrections[j].corrupt) {
1640                     /* A corrupting seek was made */
1641                     i->write_index_corrupt = TRUE;
1642                 } else if (o->stream->write_index_corrections[j].absolute) {
1643                     /* An absolute seek was made */
1644                     i->write_index = o->stream->write_index_corrections[j].value;
1645                     i->write_index_corrupt = FALSE;
1646                 } else if (!i->write_index_corrupt) {
1647                     /* A relative seek was made */
1648                     i->write_index += o->stream->write_index_corrections[j].value;
1649                 }
1650             }
1651
1652             /* Clear old correction entries */
1653             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1654                 if (!o->stream->write_index_corrections[n].valid)
1655                     continue;
1656
1657                 if (o->stream->write_index_corrections[n].tag <= tag)
1658                     o->stream->write_index_corrections[n].valid = FALSE;
1659             }
1660         }
1661
1662         if (o->stream->direction == PA_STREAM_RECORD) {
1663             /* Read index correction */
1664
1665             if (!i->read_index_corrupt)
1666                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1667         }
1668
1669         /* Update smoother */
1670         if (o->stream->smoother) {
1671             pa_usec_t u, x;
1672
1673             u = x = pa_rtclock_now() - i->transport_usec;
1674
1675             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1676                 pa_usec_t su;
1677
1678                 /* If we weren't playing then it will take some time
1679                  * until the audio will actually come out through the
1680                  * speakers. Since we follow that timing here, we need
1681                  * to try to fix this up */
1682
1683                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1684
1685                 if (su < i->sink_usec)
1686                     x += i->sink_usec - su;
1687             }
1688
1689             if (!i->playing)
1690                 pa_smoother_pause(o->stream->smoother, x);
1691
1692             /* Update the smoother */
1693             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1694                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1695                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1696
1697             if (i->playing)
1698                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1699         }
1700     }
1701
1702     o->stream->auto_timing_update_requested = FALSE;
1703
1704     if (o->stream->latency_update_callback)
1705         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1706
1707     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1708         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1709         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1710     }
1711
1712 finish:
1713
1714     pa_operation_done(o);
1715     pa_operation_unref(o);
1716 }
1717
1718 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1719     uint32_t tag;
1720     pa_operation *o;
1721     pa_tagstruct *t;
1722     struct timeval now;
1723     int cidx = 0;
1724
1725     pa_assert(s);
1726     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1727
1728     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1729     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1730     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1731
1732     if (s->direction == PA_STREAM_PLAYBACK) {
1733         /* Find a place to store the write_index correction data for this entry */
1734         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1735
1736         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1737         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1738     }
1739     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1740
1741     t = pa_tagstruct_command(
1742             s->context,
1743             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1744             &tag);
1745     pa_tagstruct_putu32(t, s->channel);
1746     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1747
1748     pa_pstream_send_tagstruct(s->context->pstream, t);
1749     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1750
1751     if (s->direction == PA_STREAM_PLAYBACK) {
1752         /* Fill in initial correction data */
1753
1754         s->current_write_index_correction = cidx;
1755
1756         s->write_index_corrections[cidx].valid = TRUE;
1757         s->write_index_corrections[cidx].absolute = FALSE;
1758         s->write_index_corrections[cidx].corrupt = FALSE;
1759         s->write_index_corrections[cidx].tag = tag;
1760         s->write_index_corrections[cidx].value = 0;
1761     }
1762
1763     return o;
1764 }
1765
1766 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1767     pa_stream *s = userdata;
1768
1769     pa_assert(pd);
1770     pa_assert(s);
1771     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1772
1773     pa_stream_ref(s);
1774
1775     if (command != PA_COMMAND_REPLY) {
1776         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1777             goto finish;
1778
1779         pa_stream_set_state(s, PA_STREAM_FAILED);
1780         goto finish;
1781     } else if (!pa_tagstruct_eof(t)) {
1782         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1783         goto finish;
1784     }
1785
1786     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1787
1788 finish:
1789     pa_stream_unref(s);
1790 }
1791
1792 int pa_stream_disconnect(pa_stream *s) {
1793     pa_tagstruct *t;
1794     uint32_t tag;
1795
1796     pa_assert(s);
1797     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1798
1799     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1800     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1801     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1802
1803     pa_stream_ref(s);
1804
1805     t = pa_tagstruct_command(
1806             s->context,
1807             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1808                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1809             &tag);
1810     pa_tagstruct_putu32(t, s->channel);
1811     pa_pstream_send_tagstruct(s->context->pstream, t);
1812     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1813
1814     pa_stream_unref(s);
1815     return 0;
1816 }
1817
1818 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1819     pa_assert(s);
1820     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1821
1822     if (pa_detect_fork())
1823         return;
1824
1825     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1826         return;
1827
1828     s->read_callback = cb;
1829     s->read_userdata = userdata;
1830 }
1831
1832 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1833     pa_assert(s);
1834     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1835
1836     if (pa_detect_fork())
1837         return;
1838
1839     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1840         return;
1841
1842     s->write_callback = cb;
1843     s->write_userdata = userdata;
1844 }
1845
1846 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1847     pa_assert(s);
1848     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1849
1850     if (pa_detect_fork())
1851         return;
1852
1853     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1854         return;
1855
1856     s->state_callback = cb;
1857     s->state_userdata = userdata;
1858 }
1859
1860 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1861     pa_assert(s);
1862     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1863
1864     if (pa_detect_fork())
1865         return;
1866
1867     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1868         return;
1869
1870     s->overflow_callback = cb;
1871     s->overflow_userdata = userdata;
1872 }
1873
1874 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1875     pa_assert(s);
1876     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1877
1878     if (pa_detect_fork())
1879         return;
1880
1881     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1882         return;
1883
1884     s->underflow_callback = cb;
1885     s->underflow_userdata = userdata;
1886 }
1887
1888 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1889     pa_assert(s);
1890     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1891
1892     if (pa_detect_fork())
1893         return;
1894
1895     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1896         return;
1897
1898     s->latency_update_callback = cb;
1899     s->latency_update_userdata = userdata;
1900 }
1901
1902 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1903     pa_assert(s);
1904     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1905
1906     if (pa_detect_fork())
1907         return;
1908
1909     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1910         return;
1911
1912     s->moved_callback = cb;
1913     s->moved_userdata = userdata;
1914 }
1915
1916 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1917     pa_assert(s);
1918     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1919
1920     if (pa_detect_fork())
1921         return;
1922
1923     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1924         return;
1925
1926     s->suspended_callback = cb;
1927     s->suspended_userdata = userdata;
1928 }
1929
1930 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1931     pa_assert(s);
1932     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1933
1934     if (pa_detect_fork())
1935         return;
1936
1937     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1938         return;
1939
1940     s->started_callback = cb;
1941     s->started_userdata = userdata;
1942 }
1943
1944 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1945     pa_assert(s);
1946     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1947
1948     if (pa_detect_fork())
1949         return;
1950
1951     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1952         return;
1953
1954     s->event_callback = cb;
1955     s->event_userdata = userdata;
1956 }
1957
1958 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1959     pa_assert(s);
1960     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1961
1962     if (pa_detect_fork())
1963         return;
1964
1965     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1966         return;
1967
1968     s->buffer_attr_callback = cb;
1969     s->buffer_attr_userdata = userdata;
1970 }
1971
1972 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1973     pa_operation *o = userdata;
1974     int success = 1;
1975
1976     pa_assert(pd);
1977     pa_assert(o);
1978     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1979
1980     if (!o->context)
1981         goto finish;
1982
1983     if (command != PA_COMMAND_REPLY) {
1984         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1985             goto finish;
1986
1987         success = 0;
1988     } else if (!pa_tagstruct_eof(t)) {
1989         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1990         goto finish;
1991     }
1992
1993     if (o->callback) {
1994         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1995         cb(o->stream, success, o->userdata);
1996     }
1997
1998 finish:
1999     pa_operation_done(o);
2000     pa_operation_unref(o);
2001 }
2002
2003 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2004     pa_operation *o;
2005     pa_tagstruct *t;
2006     uint32_t tag;
2007
2008     pa_assert(s);
2009     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2010
2011     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2012     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2013     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2014
2015     s->corked = b;
2016
2017     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2018
2019     t = pa_tagstruct_command(
2020             s->context,
2021             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2022             &tag);
2023     pa_tagstruct_putu32(t, s->channel);
2024     pa_tagstruct_put_boolean(t, !!b);
2025     pa_pstream_send_tagstruct(s->context->pstream, t);
2026     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2027
2028     check_smoother_status(s, FALSE, FALSE, FALSE);
2029
2030     /* This might cause the indexes to hang/start again, hence
2031      * let's request a timing update */
2032     request_auto_timing_update(s, TRUE);
2033
2034     return o;
2035 }
2036
2037 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2038     pa_tagstruct *t;
2039     pa_operation *o;
2040     uint32_t tag;
2041
2042     pa_assert(s);
2043     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2044
2045     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2046     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2047
2048     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2049
2050     t = pa_tagstruct_command(s->context, command, &tag);
2051     pa_tagstruct_putu32(t, s->channel);
2052     pa_pstream_send_tagstruct(s->context->pstream, t);
2053     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2054
2055     return o;
2056 }
2057
2058 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2059     pa_operation *o;
2060
2061     pa_assert(s);
2062     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2063
2064     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2065     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2066     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2067
2068     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2069         return NULL;
2070
2071     if (s->direction == PA_STREAM_PLAYBACK) {
2072
2073         if (s->write_index_corrections[s->current_write_index_correction].valid)
2074             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2075
2076         if (s->buffer_attr.prebuf > 0)
2077             check_smoother_status(s, FALSE, FALSE, TRUE);
2078
2079         /* This will change the write index, but leave the
2080          * read index untouched. */
2081         invalidate_indexes(s, FALSE, TRUE);
2082
2083     } else
2084         /* For record streams this has no influence on the write
2085          * index, but the read index might jump. */
2086         invalidate_indexes(s, TRUE, FALSE);
2087
2088     return o;
2089 }
2090
2091 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2092     pa_operation *o;
2093
2094     pa_assert(s);
2095     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2096
2097     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2098     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2099     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2100     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2101
2102     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2103         return NULL;
2104
2105     /* This might cause the read index to hang again, hence
2106      * let's request a timing update */
2107     request_auto_timing_update(s, TRUE);
2108
2109     return o;
2110 }
2111
2112 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2113     pa_operation *o;
2114
2115     pa_assert(s);
2116     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2117
2118     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2119     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2120     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2121     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2122
2123     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2124         return NULL;
2125
2126     /* This might cause the read index to start moving again, hence
2127      * let's request a timing update */
2128     request_auto_timing_update(s, TRUE);
2129
2130     return o;
2131 }
2132
2133 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2134     pa_operation *o;
2135
2136     pa_assert(s);
2137     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2138     pa_assert(name);
2139
2140     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2141     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2142     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2143
2144     if (s->context->version >= 13) {
2145         pa_proplist *p = pa_proplist_new();
2146
2147         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2148         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2149         pa_proplist_free(p);
2150     } else {
2151         pa_tagstruct *t;
2152         uint32_t tag;
2153
2154         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2155         t = pa_tagstruct_command(
2156                 s->context,
2157                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2158                 &tag);
2159         pa_tagstruct_putu32(t, s->channel);
2160         pa_tagstruct_puts(t, name);
2161         pa_pstream_send_tagstruct(s->context->pstream, t);
2162         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2163     }
2164
2165     return o;
2166 }
2167
2168 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2169     pa_usec_t usec;
2170
2171     pa_assert(s);
2172     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2173
2174     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2175     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2176     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2177     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2178     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2179     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2180
2181     if (s->smoother)
2182         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2183     else
2184         usec = calc_time(s, FALSE);
2185
2186     /* Make sure the time runs monotonically */
2187     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2188         if (usec < s->previous_time)
2189             usec = s->previous_time;
2190         else
2191             s->previous_time = usec;
2192     }
2193
2194     if (r_usec)
2195         *r_usec = usec;
2196
2197     return 0;
2198 }
2199
2200 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2201     pa_assert(s);
2202     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2203
2204     if (negative)
2205         *negative = 0;
2206
2207     if (a >= b)
2208         return a-b;
2209     else {
2210         if (negative && s->direction == PA_STREAM_RECORD) {
2211             *negative = 1;
2212             return b-a;
2213         } else
2214             return 0;
2215     }
2216 }
2217
2218 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2219     pa_usec_t t, c;
2220     int r;
2221     int64_t cindex;
2222
2223     pa_assert(s);
2224     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2225     pa_assert(r_usec);
2226
2227     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2228     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2229     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2230     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2231     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2232     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2233
2234     if ((r = pa_stream_get_time(s, &t)) < 0)
2235         return r;
2236
2237     if (s->direction == PA_STREAM_PLAYBACK)
2238         cindex = s->timing_info.write_index;
2239     else
2240         cindex = s->timing_info.read_index;
2241
2242     if (cindex < 0)
2243         cindex = 0;
2244
2245     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2246
2247     if (s->direction == PA_STREAM_PLAYBACK)
2248         *r_usec = time_counter_diff(s, c, t, negative);
2249     else
2250         *r_usec = time_counter_diff(s, t, c, negative);
2251
2252     return 0;
2253 }
2254
2255 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2256     pa_assert(s);
2257     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2258
2259     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2260     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2261     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2262     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2263
2264     return &s->timing_info;
2265 }
2266
2267 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2268     pa_assert(s);
2269     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2270
2271     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2272
2273     return &s->sample_spec;
2274 }
2275
2276 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2277     pa_assert(s);
2278     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279
2280     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2281
2282     return &s->channel_map;
2283 }
2284
2285 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2286     pa_assert(s);
2287     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2288
2289     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2290     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2291     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2292
2293     return &s->buffer_attr;
2294 }
2295
2296 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2297     pa_operation *o = userdata;
2298     int success = 1;
2299
2300     pa_assert(pd);
2301     pa_assert(o);
2302     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2303
2304     if (!o->context)
2305         goto finish;
2306
2307     if (command != PA_COMMAND_REPLY) {
2308         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2309             goto finish;
2310
2311         success = 0;
2312     } else {
2313         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2314             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2315                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2316                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2317                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2318                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2319                 goto finish;
2320             }
2321         } else if (o->stream->direction == PA_STREAM_RECORD) {
2322             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2323                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2324                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2325                 goto finish;
2326             }
2327         }
2328
2329         if (o->stream->context->version >= 13) {
2330             pa_usec_t usec;
2331
2332             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2333                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2334                 goto finish;
2335             }
2336
2337             if (o->stream->direction == PA_STREAM_RECORD)
2338                 o->stream->timing_info.configured_source_usec = usec;
2339             else
2340                 o->stream->timing_info.configured_sink_usec = usec;
2341         }
2342
2343         if (!pa_tagstruct_eof(t)) {
2344             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2345             goto finish;
2346         }
2347     }
2348
2349     if (o->callback) {
2350         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2351         cb(o->stream, success, o->userdata);
2352     }
2353
2354 finish:
2355     pa_operation_done(o);
2356     pa_operation_unref(o);
2357 }
2358
2359
2360 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2361     pa_operation *o;
2362     pa_tagstruct *t;
2363     uint32_t tag;
2364
2365     pa_assert(s);
2366     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2367     pa_assert(attr);
2368
2369     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2370     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2371     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2372     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2373
2374     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2375
2376     t = pa_tagstruct_command(
2377             s->context,
2378             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2379             &tag);
2380     pa_tagstruct_putu32(t, s->channel);
2381
2382     pa_tagstruct_putu32(t, attr->maxlength);
2383
2384     if (s->direction == PA_STREAM_PLAYBACK)
2385         pa_tagstruct_put(
2386                 t,
2387                 PA_TAG_U32, attr->tlength,
2388                 PA_TAG_U32, attr->prebuf,
2389                 PA_TAG_U32, attr->minreq,
2390                 PA_TAG_INVALID);
2391     else
2392         pa_tagstruct_putu32(t, attr->fragsize);
2393
2394     if (s->context->version >= 13)
2395         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2396
2397     if (s->context->version >= 14)
2398         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2399
2400     pa_pstream_send_tagstruct(s->context->pstream, t);
2401     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2402
2403     /* This might cause changes in the read/write indexex, hence let's
2404      * request a timing update */
2405     request_auto_timing_update(s, TRUE);
2406
2407     return o;
2408 }
2409
2410 uint32_t pa_stream_get_device_index(pa_stream *s) {
2411     pa_assert(s);
2412     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2413
2414     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2415     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2416     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2417     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2418     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2419
2420     return s->device_index;
2421 }
2422
2423 const char *pa_stream_get_device_name(pa_stream *s) {
2424     pa_assert(s);
2425     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2426
2427     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2428     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2429     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2430     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2431     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2432
2433     return s->device_name;
2434 }
2435
2436 int pa_stream_is_suspended(pa_stream *s) {
2437     pa_assert(s);
2438     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2439
2440     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2441     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2442     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2443     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2444
2445     return s->suspended;
2446 }
2447
2448 int pa_stream_is_corked(pa_stream *s) {
2449     pa_assert(s);
2450     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2451
2452     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2453     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2454     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2455
2456     return s->corked;
2457 }
2458
2459 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2460     pa_operation *o = userdata;
2461     int success = 1;
2462
2463     pa_assert(pd);
2464     pa_assert(o);
2465     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2466
2467     if (!o->context)
2468         goto finish;
2469
2470     if (command != PA_COMMAND_REPLY) {
2471         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2472             goto finish;
2473
2474         success = 0;
2475     } else {
2476
2477         if (!pa_tagstruct_eof(t)) {
2478             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2479             goto finish;
2480         }
2481     }
2482
2483     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2484     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2485
2486     if (o->callback) {
2487         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2488         cb(o->stream, success, o->userdata);
2489     }
2490
2491 finish:
2492     pa_operation_done(o);
2493     pa_operation_unref(o);
2494 }
2495
2496
2497 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2498     pa_operation *o;
2499     pa_tagstruct *t;
2500     uint32_t tag;
2501
2502     pa_assert(s);
2503     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2504
2505     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2506     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2507     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2508     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2509     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2510     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2511
2512     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2513     o->private = PA_UINT_TO_PTR(rate);
2514
2515     t = pa_tagstruct_command(
2516             s->context,
2517             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2518             &tag);
2519     pa_tagstruct_putu32(t, s->channel);
2520     pa_tagstruct_putu32(t, rate);
2521
2522     pa_pstream_send_tagstruct(s->context->pstream, t);
2523     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2524
2525     return o;
2526 }
2527
2528 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2529     pa_operation *o;
2530     pa_tagstruct *t;
2531     uint32_t tag;
2532
2533     pa_assert(s);
2534     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2535
2536     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2537     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2538     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2539     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2540     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2541
2542     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2543
2544     t = pa_tagstruct_command(
2545             s->context,
2546             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2547             &tag);
2548     pa_tagstruct_putu32(t, s->channel);
2549     pa_tagstruct_putu32(t, (uint32_t) mode);
2550     pa_tagstruct_put_proplist(t, p);
2551
2552     pa_pstream_send_tagstruct(s->context->pstream, t);
2553     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2554
2555     /* Please note that we don't update s->proplist here, because we
2556      * don't export that field */
2557
2558     return o;
2559 }
2560
2561 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2562     pa_operation *o;
2563     pa_tagstruct *t;
2564     uint32_t tag;
2565     const char * const*k;
2566
2567     pa_assert(s);
2568     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2569
2570     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2571     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2572     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2573     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2574     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2575
2576     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2577
2578     t = pa_tagstruct_command(
2579             s->context,
2580             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2581             &tag);
2582     pa_tagstruct_putu32(t, s->channel);
2583
2584     for (k = keys; *k; k++)
2585         pa_tagstruct_puts(t, *k);
2586
2587     pa_tagstruct_puts(t, NULL);
2588
2589     pa_pstream_send_tagstruct(s->context->pstream, t);
2590     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2591
2592     /* Please note that we don't update s->proplist here, because we
2593      * don't export that field */
2594
2595     return o;
2596 }
2597
2598 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2599     pa_assert(s);
2600     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2601
2602     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2603     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2604     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2605     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2606
2607     s->direct_on_input = sink_input_idx;
2608
2609     return 0;
2610 }
2611
2612 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2613     pa_assert(s);
2614     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2615
2616     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2617     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2618     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2619
2620     return s->direct_on_input;
2621 }