pulse: delay smoother update only when unpausing, not when pausing, since we don...
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57     s->read_callback = NULL;
58     s->read_userdata = NULL;
59     s->write_callback = NULL;
60     s->write_userdata = NULL;
61     s->state_callback = NULL;
62     s->state_userdata = NULL;
63     s->overflow_callback = NULL;
64     s->overflow_userdata = NULL;
65     s->underflow_callback = NULL;
66     s->underflow_userdata = NULL;
67     s->latency_update_callback = NULL;
68     s->latency_update_userdata = NULL;
69     s->moved_callback = NULL;
70     s->moved_userdata = NULL;
71     s->suspended_callback = NULL;
72     s->suspended_userdata = NULL;
73     s->started_callback = NULL;
74     s->started_userdata = NULL;
75     s->event_callback = NULL;
76     s->event_userdata = NULL;
77     s->buffer_attr_callback = NULL;
78     s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82         pa_context *c,
83         const char *name,
84         const pa_sample_spec *ss,
85         const pa_channel_map *map,
86         pa_proplist *p) {
87
88     pa_stream *s;
89     int i;
90     pa_channel_map tmap;
91
92     pa_assert(c);
93     pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     if (!map)
104         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114
115     s->sample_spec = *ss;
116     s->channel_map = *map;
117
118     s->direct_on_input = PA_INVALID_INDEX;
119
120     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121     if (name)
122         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124     s->channel = 0;
125     s->channel_valid = FALSE;
126     s->syncid = c->csyncid++;
127     s->stream_index = PA_INVALID_INDEX;
128
129     s->requested_bytes = 0;
130     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132     /* We initialize der target length here, so that if the user
133      * passes no explicit buffering metrics the default is similar to
134      * what older PA versions provided. */
135
136     s->buffer_attr.maxlength = (uint32_t) -1;
137     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138     s->buffer_attr.minreq = (uint32_t) -1;
139     s->buffer_attr.prebuf = (uint32_t) -1;
140     s->buffer_attr.fragsize = (uint32_t) -1;
141
142     s->device_index = PA_INVALID_INDEX;
143     s->device_name = NULL;
144     s->suspended = FALSE;
145     s->corked = FALSE;
146
147     s->write_memblock = NULL;
148     s->write_data = NULL;
149
150     pa_memchunk_reset(&s->peek_memchunk);
151     s->peek_data = NULL;
152     s->record_memblockq = NULL;
153
154     memset(&s->timing_info, 0, sizeof(s->timing_info));
155     s->timing_info_valid = FALSE;
156
157     s->previous_time = 0;
158
159     s->read_index_not_before = 0;
160     s->write_index_not_before = 0;
161     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162         s->write_index_corrections[i].valid = 0;
163     s->current_write_index_correction = 0;
164
165     s->auto_timing_update_event = NULL;
166     s->auto_timing_update_requested = FALSE;
167     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
168
169     reset_callbacks(s);
170
171     s->smoother = NULL;
172
173     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174     PA_LLIST_PREPEND(pa_stream, c->streams, s);
175     pa_stream_ref(s);
176
177     return s;
178 }
179
180 static void stream_unlink(pa_stream *s) {
181     pa_operation *o, *n;
182     pa_assert(s);
183
184     if (!s->context)
185         return;
186
187     /* Detach from context */
188
189     /* Unref all operatio object that point to us */
190     for (o = s->context->operations; o; o = n) {
191         n = o->next;
192
193         if (o->stream == s)
194             pa_operation_cancel(o);
195     }
196
197     /* Drop all outstanding replies for this stream */
198     if (s->context->pdispatch)
199         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200
201     if (s->channel_valid) {
202         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203         s->channel = 0;
204         s->channel_valid = FALSE;
205     }
206
207     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208     pa_stream_unref(s);
209
210     s->context = NULL;
211
212     if (s->auto_timing_update_event) {
213         pa_assert(s->mainloop);
214         s->mainloop->time_free(s->auto_timing_update_event);
215     }
216
217     reset_callbacks(s);
218 }
219
220 static void stream_free(pa_stream *s) {
221     pa_assert(s);
222
223     stream_unlink(s);
224
225     if (s->write_memblock) {
226         pa_memblock_release(s->write_memblock);
227         pa_memblock_unref(s->write_data);
228     }
229
230     if (s->peek_memchunk.memblock) {
231         if (s->peek_data)
232             pa_memblock_release(s->peek_memchunk.memblock);
233         pa_memblock_unref(s->peek_memchunk.memblock);
234     }
235
236     if (s->record_memblockq)
237         pa_memblockq_free(s->record_memblockq);
238
239     if (s->proplist)
240         pa_proplist_free(s->proplist);
241
242     if (s->smoother)
243         pa_smoother_free(s->smoother);
244
245     pa_xfree(s->device_name);
246     pa_xfree(s);
247 }
248
249 void pa_stream_unref(pa_stream *s) {
250     pa_assert(s);
251     pa_assert(PA_REFCNT_VALUE(s) >= 1);
252
253     if (PA_REFCNT_DEC(s) <= 0)
254         stream_free(s);
255 }
256
257 pa_stream* pa_stream_ref(pa_stream *s) {
258     pa_assert(s);
259     pa_assert(PA_REFCNT_VALUE(s) >= 1);
260
261     PA_REFCNT_INC(s);
262     return s;
263 }
264
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->state;
270 }
271
272 pa_context* pa_stream_get_context(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     return s->context;
277 }
278
279 uint32_t pa_stream_get_index(pa_stream *s) {
280     pa_assert(s);
281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
282
283     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
285
286     return s->stream_index;
287 }
288
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290     pa_assert(s);
291     pa_assert(PA_REFCNT_VALUE(s) >= 1);
292
293     if (s->state == st)
294         return;
295
296     pa_stream_ref(s);
297
298     s->state = st;
299
300     if (s->state_callback)
301         s->state_callback(s, s->state_userdata);
302
303     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304         stream_unlink(s);
305
306     pa_stream_unref(s);
307 }
308
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310     pa_assert(s);
311     pa_assert(PA_REFCNT_VALUE(s) >= 1);
312
313     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314         return;
315
316     if (s->state == PA_STREAM_READY &&
317         (force || !s->auto_timing_update_requested)) {
318         pa_operation *o;
319
320 /*         pa_log("Automatically requesting new timing data"); */
321
322         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323             pa_operation_unref(o);
324             s->auto_timing_update_requested = TRUE;
325         }
326     }
327
328     if (s->auto_timing_update_event) {
329         if (force)
330             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
331
332         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
333
334         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335     }
336 }
337
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339     pa_context *c = userdata;
340     pa_stream *s;
341     uint32_t channel;
342
343     pa_assert(pd);
344     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345     pa_assert(t);
346     pa_assert(c);
347     pa_assert(PA_REFCNT_VALUE(c) >= 1);
348
349     pa_context_ref(c);
350
351     if (pa_tagstruct_getu32(t, &channel) < 0 ||
352         !pa_tagstruct_eof(t)) {
353         pa_context_fail(c, PA_ERR_PROTOCOL);
354         goto finish;
355     }
356
357     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358         goto finish;
359
360     if (s->state != PA_STREAM_READY)
361         goto finish;
362
363     pa_context_set_error(c, PA_ERR_KILLED);
364     pa_stream_set_state(s, PA_STREAM_FAILED);
365
366 finish:
367     pa_context_unref(c);
368 }
369
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371     pa_usec_t x;
372
373     pa_assert(s);
374     pa_assert(!force_start || !force_stop);
375
376     if (!s->smoother)
377         return;
378
379     x = pa_rtclock_now();
380
381     if (s->timing_info_valid) {
382         if (aposteriori)
383             x -= s->timing_info.transport_usec;
384         else
385             x += s->timing_info.transport_usec;
386     }
387
388     if (s->suspended || s->corked || force_stop)
389         pa_smoother_pause(s->smoother, x);
390     else if (force_start || s->buffer_attr.prebuf == 0) {
391
392         if (!s->timing_info_valid &&
393             !aposteriori &&
394             !force_start &&
395             !force_stop &&
396             s->context->version >= 13) {
397
398             /* If the server supports STARTED events we take them as
399              * indications when audio really starts/stops playing, if
400              * we don't have any timing info yet -- instead of trying
401              * to be smart and guessing the server time. Otherwise the
402              * unknown transport delay add too much noise to our time
403              * calculations. */
404
405             return;
406         }
407
408         pa_smoother_resume(s->smoother, x, TRUE);
409     }
410
411     /* Please note that we have no idea if playback actually started
412      * if prebuf is non-zero! */
413 }
414
415 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
416     pa_context *c = userdata;
417     pa_stream *s;
418     uint32_t channel;
419     const char *dn;
420     pa_bool_t suspended;
421     uint32_t di;
422     pa_usec_t usec = 0;
423     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
424
425     pa_assert(pd);
426     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
427     pa_assert(t);
428     pa_assert(c);
429     pa_assert(PA_REFCNT_VALUE(c) >= 1);
430
431     pa_context_ref(c);
432
433     if (c->version < 12) {
434         pa_context_fail(c, PA_ERR_PROTOCOL);
435         goto finish;
436     }
437
438     if (pa_tagstruct_getu32(t, &channel) < 0 ||
439         pa_tagstruct_getu32(t, &di) < 0 ||
440         pa_tagstruct_gets(t, &dn) < 0 ||
441         pa_tagstruct_get_boolean(t, &suspended) < 0) {
442         pa_context_fail(c, PA_ERR_PROTOCOL);
443         goto finish;
444     }
445
446     if (c->version >= 13) {
447
448         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
449             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
450                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
451                 pa_tagstruct_get_usec(t, &usec) < 0) {
452                 pa_context_fail(c, PA_ERR_PROTOCOL);
453                 goto finish;
454             }
455         } else {
456             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
457                 pa_tagstruct_getu32(t, &tlength) < 0 ||
458                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
459                 pa_tagstruct_getu32(t, &minreq) < 0 ||
460                 pa_tagstruct_get_usec(t, &usec) < 0) {
461                 pa_context_fail(c, PA_ERR_PROTOCOL);
462                 goto finish;
463             }
464         }
465     }
466
467     if (!pa_tagstruct_eof(t)) {
468         pa_context_fail(c, PA_ERR_PROTOCOL);
469         goto finish;
470     }
471
472     if (!dn || di == PA_INVALID_INDEX) {
473         pa_context_fail(c, PA_ERR_PROTOCOL);
474         goto finish;
475     }
476
477     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
478         goto finish;
479
480     if (s->state != PA_STREAM_READY)
481         goto finish;
482
483     if (c->version >= 13) {
484         if (s->direction == PA_STREAM_RECORD)
485             s->timing_info.configured_source_usec = usec;
486         else
487             s->timing_info.configured_sink_usec = usec;
488
489         s->buffer_attr.maxlength = maxlength;
490         s->buffer_attr.fragsize = fragsize;
491         s->buffer_attr.tlength = tlength;
492         s->buffer_attr.prebuf = prebuf;
493         s->buffer_attr.minreq = minreq;
494     }
495
496     pa_xfree(s->device_name);
497     s->device_name = pa_xstrdup(dn);
498     s->device_index = di;
499
500     s->suspended = suspended;
501
502     check_smoother_status(s, TRUE, FALSE, FALSE);
503     request_auto_timing_update(s, TRUE);
504
505     if (s->moved_callback)
506         s->moved_callback(s, s->moved_userdata);
507
508 finish:
509     pa_context_unref(c);
510 }
511
512 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
513     pa_context *c = userdata;
514     pa_stream *s;
515     uint32_t channel;
516     pa_usec_t usec = 0;
517     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
518
519     pa_assert(pd);
520     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
521     pa_assert(t);
522     pa_assert(c);
523     pa_assert(PA_REFCNT_VALUE(c) >= 1);
524
525     pa_context_ref(c);
526
527     if (c->version < 15) {
528         pa_context_fail(c, PA_ERR_PROTOCOL);
529         goto finish;
530     }
531
532     if (pa_tagstruct_getu32(t, &channel) < 0) {
533         pa_context_fail(c, PA_ERR_PROTOCOL);
534         goto finish;
535     }
536
537     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
538         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
539             pa_tagstruct_getu32(t, &fragsize) < 0 ||
540             pa_tagstruct_get_usec(t, &usec) < 0) {
541             pa_context_fail(c, PA_ERR_PROTOCOL);
542             goto finish;
543         }
544     } else {
545         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
546             pa_tagstruct_getu32(t, &tlength) < 0 ||
547             pa_tagstruct_getu32(t, &prebuf) < 0 ||
548             pa_tagstruct_getu32(t, &minreq) < 0 ||
549             pa_tagstruct_get_usec(t, &usec) < 0) {
550             pa_context_fail(c, PA_ERR_PROTOCOL);
551             goto finish;
552         }
553     }
554
555     if (!pa_tagstruct_eof(t)) {
556         pa_context_fail(c, PA_ERR_PROTOCOL);
557         goto finish;
558     }
559
560     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
561         goto finish;
562
563     if (s->state != PA_STREAM_READY)
564         goto finish;
565
566     if (s->direction == PA_STREAM_RECORD)
567         s->timing_info.configured_source_usec = usec;
568     else
569         s->timing_info.configured_sink_usec = usec;
570
571     s->buffer_attr.maxlength = maxlength;
572     s->buffer_attr.fragsize = fragsize;
573     s->buffer_attr.tlength = tlength;
574     s->buffer_attr.prebuf = prebuf;
575     s->buffer_attr.minreq = minreq;
576
577     request_auto_timing_update(s, TRUE);
578
579     if (s->buffer_attr_callback)
580         s->buffer_attr_callback(s, s->buffer_attr_userdata);
581
582 finish:
583     pa_context_unref(c);
584 }
585
586 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
587     pa_context *c = userdata;
588     pa_stream *s;
589     uint32_t channel;
590     pa_bool_t suspended;
591
592     pa_assert(pd);
593     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
594     pa_assert(t);
595     pa_assert(c);
596     pa_assert(PA_REFCNT_VALUE(c) >= 1);
597
598     pa_context_ref(c);
599
600     if (c->version < 12) {
601         pa_context_fail(c, PA_ERR_PROTOCOL);
602         goto finish;
603     }
604
605     if (pa_tagstruct_getu32(t, &channel) < 0 ||
606         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
607         !pa_tagstruct_eof(t)) {
608         pa_context_fail(c, PA_ERR_PROTOCOL);
609         goto finish;
610     }
611
612     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
613         goto finish;
614
615     if (s->state != PA_STREAM_READY)
616         goto finish;
617
618     s->suspended = suspended;
619
620     check_smoother_status(s, TRUE, FALSE, FALSE);
621     request_auto_timing_update(s, TRUE);
622
623     if (s->suspended_callback)
624         s->suspended_callback(s, s->suspended_userdata);
625
626 finish:
627     pa_context_unref(c);
628 }
629
630 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
631     pa_context *c = userdata;
632     pa_stream *s;
633     uint32_t channel;
634
635     pa_assert(pd);
636     pa_assert(command == PA_COMMAND_STARTED);
637     pa_assert(t);
638     pa_assert(c);
639     pa_assert(PA_REFCNT_VALUE(c) >= 1);
640
641     pa_context_ref(c);
642
643     if (c->version < 13) {
644         pa_context_fail(c, PA_ERR_PROTOCOL);
645         goto finish;
646     }
647
648     if (pa_tagstruct_getu32(t, &channel) < 0 ||
649         !pa_tagstruct_eof(t)) {
650         pa_context_fail(c, PA_ERR_PROTOCOL);
651         goto finish;
652     }
653
654     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
655         goto finish;
656
657     if (s->state != PA_STREAM_READY)
658         goto finish;
659
660     check_smoother_status(s, TRUE, TRUE, FALSE);
661     request_auto_timing_update(s, TRUE);
662
663     if (s->started_callback)
664         s->started_callback(s, s->started_userdata);
665
666 finish:
667     pa_context_unref(c);
668 }
669
670 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
671     pa_context *c = userdata;
672     pa_stream *s;
673     uint32_t channel;
674     pa_proplist *pl = NULL;
675     const char *event;
676
677     pa_assert(pd);
678     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
679     pa_assert(t);
680     pa_assert(c);
681     pa_assert(PA_REFCNT_VALUE(c) >= 1);
682
683     pa_context_ref(c);
684
685     if (c->version < 15) {
686         pa_context_fail(c, PA_ERR_PROTOCOL);
687         goto finish;
688     }
689
690     pl = pa_proplist_new();
691
692     if (pa_tagstruct_getu32(t, &channel) < 0 ||
693         pa_tagstruct_gets(t, &event) < 0 ||
694         pa_tagstruct_get_proplist(t, pl) < 0 ||
695         !pa_tagstruct_eof(t) || !event) {
696         pa_context_fail(c, PA_ERR_PROTOCOL);
697         goto finish;
698     }
699
700     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
701         goto finish;
702
703     if (s->state != PA_STREAM_READY)
704         goto finish;
705
706     if (s->event_callback)
707         s->event_callback(s, event, pl, s->event_userdata);
708
709 finish:
710     pa_context_unref(c);
711
712     if (pl)
713         pa_proplist_free(pl);
714 }
715
716 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717     pa_stream *s;
718     pa_context *c = userdata;
719     uint32_t bytes, channel;
720
721     pa_assert(pd);
722     pa_assert(command == PA_COMMAND_REQUEST);
723     pa_assert(t);
724     pa_assert(c);
725     pa_assert(PA_REFCNT_VALUE(c) >= 1);
726
727     pa_context_ref(c);
728
729     if (pa_tagstruct_getu32(t, &channel) < 0 ||
730         pa_tagstruct_getu32(t, &bytes) < 0 ||
731         !pa_tagstruct_eof(t)) {
732         pa_context_fail(c, PA_ERR_PROTOCOL);
733         goto finish;
734     }
735
736     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
737         goto finish;
738
739     if (s->state != PA_STREAM_READY)
740         goto finish;
741
742     s->requested_bytes += bytes;
743
744     if (s->requested_bytes > 0 && s->write_callback)
745         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
746
747 finish:
748     pa_context_unref(c);
749 }
750
751 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
752     pa_stream *s;
753     pa_context *c = userdata;
754     uint32_t channel;
755
756     pa_assert(pd);
757     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
758     pa_assert(t);
759     pa_assert(c);
760     pa_assert(PA_REFCNT_VALUE(c) >= 1);
761
762     pa_context_ref(c);
763
764     if (pa_tagstruct_getu32(t, &channel) < 0 ||
765         !pa_tagstruct_eof(t)) {
766         pa_context_fail(c, PA_ERR_PROTOCOL);
767         goto finish;
768     }
769
770     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
771         goto finish;
772
773     if (s->state != PA_STREAM_READY)
774         goto finish;
775
776     if (s->buffer_attr.prebuf > 0)
777         check_smoother_status(s, TRUE, FALSE, TRUE);
778
779     request_auto_timing_update(s, TRUE);
780
781     if (command == PA_COMMAND_OVERFLOW) {
782         if (s->overflow_callback)
783             s->overflow_callback(s, s->overflow_userdata);
784     } else if (command == PA_COMMAND_UNDERFLOW) {
785         if (s->underflow_callback)
786             s->underflow_callback(s, s->underflow_userdata);
787     }
788
789  finish:
790     pa_context_unref(c);
791 }
792
793 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
794     pa_assert(s);
795     pa_assert(PA_REFCNT_VALUE(s) >= 1);
796
797 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
798
799     if (s->state != PA_STREAM_READY)
800         return;
801
802     if (w) {
803         s->write_index_not_before = s->context->ctag;
804
805         if (s->timing_info_valid)
806             s->timing_info.write_index_corrupt = TRUE;
807
808 /*         pa_log("write_index invalidated"); */
809     }
810
811     if (r) {
812         s->read_index_not_before = s->context->ctag;
813
814         if (s->timing_info_valid)
815             s->timing_info.read_index_corrupt = TRUE;
816
817 /*         pa_log("read_index invalidated"); */
818     }
819
820     request_auto_timing_update(s, TRUE);
821 }
822
823 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
824     pa_stream *s = userdata;
825
826     pa_assert(s);
827     pa_assert(PA_REFCNT_VALUE(s) >= 1);
828
829     pa_stream_ref(s);
830     request_auto_timing_update(s, FALSE);
831     pa_stream_unref(s);
832 }
833
834 static void create_stream_complete(pa_stream *s) {
835     pa_assert(s);
836     pa_assert(PA_REFCNT_VALUE(s) >= 1);
837     pa_assert(s->state == PA_STREAM_CREATING);
838
839     pa_stream_set_state(s, PA_STREAM_READY);
840
841     if (s->requested_bytes > 0 && s->write_callback)
842         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
843
844     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
845         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
846         pa_assert(!s->auto_timing_update_event);
847         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
848
849         request_auto_timing_update(s, TRUE);
850     }
851
852     check_smoother_status(s, TRUE, FALSE, FALSE);
853 }
854
855 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
856     pa_assert(s);
857     pa_assert(attr);
858     pa_assert(ss);
859
860     if (s->context->version >= 13)
861         return;
862
863     /* Version older than 0.9.10 didn't do server side buffer_attr
864      * selection, hence we have to fake it on the client side. */
865
866     /* We choose fairly conservative values here, to not confuse
867      * old clients with extremely large playback buffers */
868
869     if (attr->maxlength == (uint32_t) -1)
870         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
871
872     if (attr->tlength == (uint32_t) -1)
873         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
874
875     if (attr->minreq == (uint32_t) -1)
876         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
877
878     if (attr->prebuf == (uint32_t) -1)
879         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
880
881     if (attr->fragsize == (uint32_t) -1)
882         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
883 }
884
885 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
886     pa_stream *s = userdata;
887     uint32_t requested_bytes = 0;
888
889     pa_assert(pd);
890     pa_assert(s);
891     pa_assert(PA_REFCNT_VALUE(s) >= 1);
892     pa_assert(s->state == PA_STREAM_CREATING);
893
894     pa_stream_ref(s);
895
896     if (command != PA_COMMAND_REPLY) {
897         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
898             goto finish;
899
900         pa_stream_set_state(s, PA_STREAM_FAILED);
901         goto finish;
902     }
903
904     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
905         s->channel == PA_INVALID_INDEX ||
906         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
907         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
908         pa_context_fail(s->context, PA_ERR_PROTOCOL);
909         goto finish;
910     }
911
912     s->requested_bytes = (int64_t) requested_bytes;
913
914     if (s->context->version >= 9) {
915         if (s->direction == PA_STREAM_PLAYBACK) {
916             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
917                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
918                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
919                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
920                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
921                 goto finish;
922             }
923         } else if (s->direction == PA_STREAM_RECORD) {
924             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
925                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
926                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
927                 goto finish;
928             }
929         }
930     }
931
932     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
933         pa_sample_spec ss;
934         pa_channel_map cm;
935         const char *dn = NULL;
936         pa_bool_t suspended;
937
938         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
939             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
940             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
941             pa_tagstruct_gets(t, &dn) < 0 ||
942             pa_tagstruct_get_boolean(t, &suspended) < 0) {
943             pa_context_fail(s->context, PA_ERR_PROTOCOL);
944             goto finish;
945         }
946
947         if (!dn || s->device_index == PA_INVALID_INDEX ||
948             ss.channels != cm.channels ||
949             !pa_channel_map_valid(&cm) ||
950             !pa_sample_spec_valid(&ss) ||
951             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
952             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
953             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
954             pa_context_fail(s->context, PA_ERR_PROTOCOL);
955             goto finish;
956         }
957
958         pa_xfree(s->device_name);
959         s->device_name = pa_xstrdup(dn);
960         s->suspended = suspended;
961
962         s->channel_map = cm;
963         s->sample_spec = ss;
964     }
965
966     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
967         pa_usec_t usec;
968
969         if (pa_tagstruct_get_usec(t, &usec) < 0) {
970             pa_context_fail(s->context, PA_ERR_PROTOCOL);
971             goto finish;
972         }
973
974         if (s->direction == PA_STREAM_RECORD)
975             s->timing_info.configured_source_usec = usec;
976         else
977             s->timing_info.configured_sink_usec = usec;
978     }
979
980     if (!pa_tagstruct_eof(t)) {
981         pa_context_fail(s->context, PA_ERR_PROTOCOL);
982         goto finish;
983     }
984
985     if (s->direction == PA_STREAM_RECORD) {
986         pa_assert(!s->record_memblockq);
987
988         s->record_memblockq = pa_memblockq_new(
989                 0,
990                 s->buffer_attr.maxlength,
991                 0,
992                 pa_frame_size(&s->sample_spec),
993                 1,
994                 0,
995                 0,
996                 NULL);
997     }
998
999     s->channel_valid = TRUE;
1000     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
1001
1002     create_stream_complete(s);
1003
1004 finish:
1005     pa_stream_unref(s);
1006 }
1007
1008 static int create_stream(
1009         pa_stream_direction_t direction,
1010         pa_stream *s,
1011         const char *dev,
1012         const pa_buffer_attr *attr,
1013         pa_stream_flags_t flags,
1014         const pa_cvolume *volume,
1015         pa_stream *sync_stream) {
1016
1017     pa_tagstruct *t;
1018     uint32_t tag;
1019     pa_bool_t volume_set = FALSE;
1020
1021     pa_assert(s);
1022     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1023     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1024
1025     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1026     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1027     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1028     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1029                                               PA_STREAM_INTERPOLATE_TIMING|
1030                                               PA_STREAM_NOT_MONOTONIC|
1031                                               PA_STREAM_AUTO_TIMING_UPDATE|
1032                                               PA_STREAM_NO_REMAP_CHANNELS|
1033                                               PA_STREAM_NO_REMIX_CHANNELS|
1034                                               PA_STREAM_FIX_FORMAT|
1035                                               PA_STREAM_FIX_RATE|
1036                                               PA_STREAM_FIX_CHANNELS|
1037                                               PA_STREAM_DONT_MOVE|
1038                                               PA_STREAM_VARIABLE_RATE|
1039                                               PA_STREAM_PEAK_DETECT|
1040                                               PA_STREAM_START_MUTED|
1041                                               PA_STREAM_ADJUST_LATENCY|
1042                                               PA_STREAM_EARLY_REQUESTS|
1043                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1044                                               PA_STREAM_START_UNMUTED|
1045                                               PA_STREAM_FAIL_ON_SUSPEND|
1046                                               PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1047
1048     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1049     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1050     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1051     /* Althought some of the other flags are not supported on older
1052      * version, we don't check for them here, because it doesn't hurt
1053      * when they are passed but actually not supported. This makes
1054      * client development easier */
1055
1056     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1057     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1058     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1059     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1060     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1061
1062     pa_stream_ref(s);
1063
1064     s->direction = direction;
1065     s->flags = flags;
1066     s->corked = !!(flags & PA_STREAM_START_CORKED);
1067
1068     if (sync_stream)
1069         s->syncid = sync_stream->syncid;
1070
1071     if (attr)
1072         s->buffer_attr = *attr;
1073     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1074
1075     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1076         pa_usec_t x;
1077
1078         x = pa_rtclock_now();
1079
1080         pa_assert(!s->smoother);
1081         s->smoother = pa_smoother_new(
1082                 SMOOTHER_ADJUST_TIME,
1083                 SMOOTHER_HISTORY_TIME,
1084                 !(flags & PA_STREAM_NOT_MONOTONIC),
1085                 TRUE,
1086                 SMOOTHER_MIN_HISTORY,
1087                 x,
1088                 TRUE);
1089     }
1090
1091     if (!dev)
1092         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1093
1094     t = pa_tagstruct_command(
1095             s->context,
1096             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1097             &tag);
1098
1099     if (s->context->version < 13)
1100         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1101
1102     pa_tagstruct_put(
1103             t,
1104             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1105             PA_TAG_CHANNEL_MAP, &s->channel_map,
1106             PA_TAG_U32, PA_INVALID_INDEX,
1107             PA_TAG_STRING, dev,
1108             PA_TAG_U32, s->buffer_attr.maxlength,
1109             PA_TAG_BOOLEAN, s->corked,
1110             PA_TAG_INVALID);
1111
1112     if (s->direction == PA_STREAM_PLAYBACK) {
1113         pa_cvolume cv;
1114
1115         pa_tagstruct_put(
1116                 t,
1117                 PA_TAG_U32, s->buffer_attr.tlength,
1118                 PA_TAG_U32, s->buffer_attr.prebuf,
1119                 PA_TAG_U32, s->buffer_attr.minreq,
1120                 PA_TAG_U32, s->syncid,
1121                 PA_TAG_INVALID);
1122
1123         volume_set = !!volume;
1124
1125         if (!volume)
1126             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1127
1128         pa_tagstruct_put_cvolume(t, volume);
1129     } else
1130         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1131
1132     if (s->context->version >= 12) {
1133         pa_tagstruct_put(
1134                 t,
1135                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1136                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1137                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1138                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1139                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1140                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1141                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1142                 PA_TAG_INVALID);
1143     }
1144
1145     if (s->context->version >= 13) {
1146
1147         if (s->direction == PA_STREAM_PLAYBACK)
1148             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1149         else
1150             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1151
1152         pa_tagstruct_put(
1153                 t,
1154                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1155                 PA_TAG_PROPLIST, s->proplist,
1156                 PA_TAG_INVALID);
1157
1158         if (s->direction == PA_STREAM_RECORD)
1159             pa_tagstruct_putu32(t, s->direct_on_input);
1160     }
1161
1162     if (s->context->version >= 14) {
1163
1164         if (s->direction == PA_STREAM_PLAYBACK)
1165             pa_tagstruct_put_boolean(t, volume_set);
1166
1167         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1168     }
1169
1170     if (s->context->version >= 15) {
1171
1172         if (s->direction == PA_STREAM_PLAYBACK)
1173             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1174
1175         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1176         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1177     }
1178
1179     if (s->context->version >= 17) {
1180
1181         if (s->direction == PA_STREAM_PLAYBACK)
1182             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1183
1184     }
1185
1186     pa_pstream_send_tagstruct(s->context->pstream, t);
1187     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1188
1189     pa_stream_set_state(s, PA_STREAM_CREATING);
1190
1191     pa_stream_unref(s);
1192     return 0;
1193 }
1194
1195 int pa_stream_connect_playback(
1196         pa_stream *s,
1197         const char *dev,
1198         const pa_buffer_attr *attr,
1199         pa_stream_flags_t flags,
1200         const pa_cvolume *volume,
1201         pa_stream *sync_stream) {
1202
1203     pa_assert(s);
1204     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1205
1206     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1207 }
1208
1209 int pa_stream_connect_record(
1210         pa_stream *s,
1211         const char *dev,
1212         const pa_buffer_attr *attr,
1213         pa_stream_flags_t flags) {
1214
1215     pa_assert(s);
1216     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1217
1218     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1219 }
1220
1221 int pa_stream_begin_write(
1222         pa_stream *s,
1223         void **data,
1224         size_t *nbytes) {
1225
1226     pa_assert(s);
1227     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1228
1229     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1230     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1231     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1232     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1233     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1234
1235     if (*nbytes != (size_t) -1) {
1236         size_t m, fs;
1237
1238         m = pa_mempool_block_size_max(s->context->mempool);
1239         fs = pa_frame_size(&s->sample_spec);
1240
1241         m = (m / fs) * fs;
1242         if (*nbytes > m)
1243             *nbytes = m;
1244     }
1245
1246     if (!s->write_memblock) {
1247         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1248         s->write_data = pa_memblock_acquire(s->write_memblock);
1249     }
1250
1251     *data = s->write_data;
1252     *nbytes = pa_memblock_get_length(s->write_memblock);
1253
1254     return 0;
1255 }
1256
1257 int pa_stream_cancel_write(
1258         pa_stream *s) {
1259
1260     pa_assert(s);
1261     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1262
1263     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1264     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1265     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1266     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1267
1268     pa_assert(s->write_data);
1269
1270     pa_memblock_release(s->write_memblock);
1271     pa_memblock_unref(s->write_memblock);
1272     s->write_memblock = NULL;
1273     s->write_data = NULL;
1274
1275     return 0;
1276 }
1277
1278 int pa_stream_write(
1279         pa_stream *s,
1280         const void *data,
1281         size_t length,
1282         pa_free_cb_t free_cb,
1283         int64_t offset,
1284         pa_seek_mode_t seek) {
1285
1286     pa_assert(s);
1287     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1288     pa_assert(data);
1289
1290     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1291     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1292     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1293     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1294     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1295     PA_CHECK_VALIDITY(s->context,
1296                       !s->write_memblock ||
1297                       ((data >= s->write_data) &&
1298                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1299                       PA_ERR_INVALID);
1300     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1301
1302     if (s->write_memblock) {
1303         pa_memchunk chunk;
1304
1305         /* pa_stream_write_begin() was called before */
1306
1307         pa_memblock_release(s->write_memblock);
1308
1309         chunk.memblock = s->write_memblock;
1310         chunk.index = (const char *) data - (const char *) s->write_data;
1311         chunk.length = length;
1312
1313         s->write_memblock = NULL;
1314         s->write_data = NULL;
1315
1316         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1317         pa_memblock_unref(chunk.memblock);
1318
1319     } else {
1320         pa_seek_mode_t t_seek = seek;
1321         int64_t t_offset = offset;
1322         size_t t_length = length;
1323         const void *t_data = data;
1324
1325         /* pa_stream_write_begin() was not called before */
1326
1327         while (t_length > 0) {
1328             pa_memchunk chunk;
1329
1330             chunk.index = 0;
1331
1332             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1333                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1334                 chunk.length = t_length;
1335             } else {
1336                 void *d;
1337
1338                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1339                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1340
1341                 d = pa_memblock_acquire(chunk.memblock);
1342                 memcpy(d, t_data, chunk.length);
1343                 pa_memblock_release(chunk.memblock);
1344             }
1345
1346             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1347
1348             t_offset = 0;
1349             t_seek = PA_SEEK_RELATIVE;
1350
1351             t_data = (const uint8_t*) t_data + chunk.length;
1352             t_length -= chunk.length;
1353
1354             pa_memblock_unref(chunk.memblock);
1355         }
1356
1357         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1358             free_cb((void*) data);
1359     }
1360
1361     /* This is obviously wrong since we ignore the seeking index . But
1362      * that's OK, the server side applies the same error */
1363     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1364
1365     if (s->direction == PA_STREAM_PLAYBACK) {
1366
1367         /* Update latency request correction */
1368         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1369
1370             if (seek == PA_SEEK_ABSOLUTE) {
1371                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1372                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1373                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1374             } else if (seek == PA_SEEK_RELATIVE) {
1375                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1376                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1377             } else
1378                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1379         }
1380
1381         /* Update the write index in the already available latency data */
1382         if (s->timing_info_valid) {
1383
1384             if (seek == PA_SEEK_ABSOLUTE) {
1385                 s->timing_info.write_index_corrupt = FALSE;
1386                 s->timing_info.write_index = offset + (int64_t) length;
1387             } else if (seek == PA_SEEK_RELATIVE) {
1388                 if (!s->timing_info.write_index_corrupt)
1389                     s->timing_info.write_index += offset + (int64_t) length;
1390             } else
1391                 s->timing_info.write_index_corrupt = TRUE;
1392         }
1393
1394         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1395             request_auto_timing_update(s, TRUE);
1396     }
1397
1398     return 0;
1399 }
1400
1401 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1402     pa_assert(s);
1403     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1404     pa_assert(data);
1405     pa_assert(length);
1406
1407     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1408     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1409     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1410
1411     if (!s->peek_memchunk.memblock) {
1412
1413         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1414             *data = NULL;
1415             *length = 0;
1416             return 0;
1417         }
1418
1419         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1420     }
1421
1422     pa_assert(s->peek_data);
1423     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1424     *length = s->peek_memchunk.length;
1425     return 0;
1426 }
1427
1428 int pa_stream_drop(pa_stream *s) {
1429     pa_assert(s);
1430     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1431
1432     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1433     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1434     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1435     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1436
1437     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1438
1439     /* Fix the simulated local read index */
1440     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1441         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1442
1443     pa_assert(s->peek_data);
1444     pa_memblock_release(s->peek_memchunk.memblock);
1445     pa_memblock_unref(s->peek_memchunk.memblock);
1446     pa_memchunk_reset(&s->peek_memchunk);
1447
1448     return 0;
1449 }
1450
1451 size_t pa_stream_writable_size(pa_stream *s) {
1452     pa_assert(s);
1453     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1454
1455     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1456     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1457     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1458
1459     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1460 }
1461
1462 size_t pa_stream_readable_size(pa_stream *s) {
1463     pa_assert(s);
1464     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1465
1466     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1467     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1468     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1469
1470     return pa_memblockq_get_length(s->record_memblockq);
1471 }
1472
1473 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1474     pa_operation *o;
1475     pa_tagstruct *t;
1476     uint32_t tag;
1477
1478     pa_assert(s);
1479     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1480
1481     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1482     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1483     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1484
1485     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1486
1487     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1488     pa_tagstruct_putu32(t, s->channel);
1489     pa_pstream_send_tagstruct(s->context->pstream, t);
1490     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1491
1492     return o;
1493 }
1494
1495 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1496     pa_usec_t usec;
1497
1498     pa_assert(s);
1499     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1500     pa_assert(s->state == PA_STREAM_READY);
1501     pa_assert(s->direction != PA_STREAM_UPLOAD);
1502     pa_assert(s->timing_info_valid);
1503     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1504     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1505
1506     if (s->direction == PA_STREAM_PLAYBACK) {
1507         /* The last byte that was written into the output device
1508          * had this time value associated */
1509         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1510
1511         if (!s->corked && !s->suspended) {
1512
1513             if (!ignore_transport)
1514                 /* Because the latency info took a little time to come
1515                  * to us, we assume that the real output time is actually
1516                  * a little ahead */
1517                 usec += s->timing_info.transport_usec;
1518
1519             /* However, the output device usually maintains a buffer
1520                too, hence the real sample currently played is a little
1521                back  */
1522             if (s->timing_info.sink_usec >= usec)
1523                 usec = 0;
1524             else
1525                 usec -= s->timing_info.sink_usec;
1526         }
1527
1528     } else {
1529         pa_assert(s->direction == PA_STREAM_RECORD);
1530
1531         /* The last byte written into the server side queue had
1532          * this time value associated */
1533         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1534
1535         if (!s->corked && !s->suspended) {
1536
1537             if (!ignore_transport)
1538                 /* Add transport latency */
1539                 usec += s->timing_info.transport_usec;
1540
1541             /* Add latency of data in device buffer */
1542             usec += s->timing_info.source_usec;
1543
1544             /* If this is a monitor source, we need to correct the
1545              * time by the playback device buffer */
1546             if (s->timing_info.sink_usec >= usec)
1547                 usec = 0;
1548             else
1549                 usec -= s->timing_info.sink_usec;
1550         }
1551     }
1552
1553     return usec;
1554 }
1555
1556 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1557     pa_operation *o = userdata;
1558     struct timeval local, remote, now;
1559     pa_timing_info *i;
1560     pa_bool_t playing = FALSE;
1561     uint64_t underrun_for = 0, playing_for = 0;
1562
1563     pa_assert(pd);
1564     pa_assert(o);
1565     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1566
1567     if (!o->context || !o->stream)
1568         goto finish;
1569
1570     i = &o->stream->timing_info;
1571
1572     o->stream->timing_info_valid = FALSE;
1573     i->write_index_corrupt = TRUE;
1574     i->read_index_corrupt = TRUE;
1575
1576     if (command != PA_COMMAND_REPLY) {
1577         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1578             goto finish;
1579
1580     } else {
1581
1582         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1583             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1584             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1585             pa_tagstruct_get_timeval(t, &local) < 0 ||
1586             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1587             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1588             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1589
1590             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1591             goto finish;
1592         }
1593
1594         if (o->context->version >= 13 &&
1595             o->stream->direction == PA_STREAM_PLAYBACK)
1596             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1597                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1598
1599                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1600                 goto finish;
1601             }
1602
1603
1604         if (!pa_tagstruct_eof(t)) {
1605             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1606             goto finish;
1607         }
1608         o->stream->timing_info_valid = TRUE;
1609         i->write_index_corrupt = FALSE;
1610         i->read_index_corrupt = FALSE;
1611
1612         i->playing = (int) playing;
1613         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1614
1615         pa_gettimeofday(&now);
1616
1617         /* Calculcate timestamps */
1618         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1619             /* local and remote seem to have synchronized clocks */
1620
1621             if (o->stream->direction == PA_STREAM_PLAYBACK)
1622                 i->transport_usec = pa_timeval_diff(&remote, &local);
1623             else
1624                 i->transport_usec = pa_timeval_diff(&now, &remote);
1625
1626             i->synchronized_clocks = TRUE;
1627             i->timestamp = remote;
1628         } else {
1629             /* clocks are not synchronized, let's estimate latency then */
1630             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1631             i->synchronized_clocks = FALSE;
1632             i->timestamp = local;
1633             pa_timeval_add(&i->timestamp, i->transport_usec);
1634         }
1635
1636         /* Invalidate read and write indexes if necessary */
1637         if (tag < o->stream->read_index_not_before)
1638             i->read_index_corrupt = TRUE;
1639
1640         if (tag < o->stream->write_index_not_before)
1641             i->write_index_corrupt = TRUE;
1642
1643         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1644             /* Write index correction */
1645
1646             int n, j;
1647             uint32_t ctag = tag;
1648
1649             /* Go through the saved correction values and add up the
1650              * total correction.*/
1651             for (n = 0, j = o->stream->current_write_index_correction+1;
1652                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1653                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1654
1655                 /* Step over invalid data or out-of-date data */
1656                 if (!o->stream->write_index_corrections[j].valid ||
1657                     o->stream->write_index_corrections[j].tag < ctag)
1658                     continue;
1659
1660                 /* Make sure that everything is in order */
1661                 ctag = o->stream->write_index_corrections[j].tag+1;
1662
1663                 /* Now fix the write index */
1664                 if (o->stream->write_index_corrections[j].corrupt) {
1665                     /* A corrupting seek was made */
1666                     i->write_index_corrupt = TRUE;
1667                 } else if (o->stream->write_index_corrections[j].absolute) {
1668                     /* An absolute seek was made */
1669                     i->write_index = o->stream->write_index_corrections[j].value;
1670                     i->write_index_corrupt = FALSE;
1671                 } else if (!i->write_index_corrupt) {
1672                     /* A relative seek was made */
1673                     i->write_index += o->stream->write_index_corrections[j].value;
1674                 }
1675             }
1676
1677             /* Clear old correction entries */
1678             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1679                 if (!o->stream->write_index_corrections[n].valid)
1680                     continue;
1681
1682                 if (o->stream->write_index_corrections[n].tag <= tag)
1683                     o->stream->write_index_corrections[n].valid = FALSE;
1684             }
1685         }
1686
1687         if (o->stream->direction == PA_STREAM_RECORD) {
1688             /* Read index correction */
1689
1690             if (!i->read_index_corrupt)
1691                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1692         }
1693
1694         /* Update smoother */
1695         if (o->stream->smoother) {
1696             pa_usec_t u, x;
1697
1698             u = x = pa_rtclock_now() - i->transport_usec;
1699
1700             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1701                 pa_usec_t su;
1702
1703                 /* If we weren't playing then it will take some time
1704                  * until the audio will actually come out through the
1705                  * speakers. Since we follow that timing here, we need
1706                  * to try to fix this up */
1707
1708                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1709
1710                 if (su < i->sink_usec)
1711                     x += i->sink_usec - su;
1712             }
1713
1714             if (!i->playing)
1715                 pa_smoother_pause(o->stream->smoother, x);
1716
1717             /* Update the smoother */
1718             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1719                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1720                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1721
1722             if (i->playing)
1723                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1724         }
1725     }
1726
1727     o->stream->auto_timing_update_requested = FALSE;
1728
1729     if (o->stream->latency_update_callback)
1730         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1731
1732     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1733         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1734         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1735     }
1736
1737 finish:
1738
1739     pa_operation_done(o);
1740     pa_operation_unref(o);
1741 }
1742
1743 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1744     uint32_t tag;
1745     pa_operation *o;
1746     pa_tagstruct *t;
1747     struct timeval now;
1748     int cidx = 0;
1749
1750     pa_assert(s);
1751     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1752
1753     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1754     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1755     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1756
1757     if (s->direction == PA_STREAM_PLAYBACK) {
1758         /* Find a place to store the write_index correction data for this entry */
1759         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1760
1761         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1762         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1763     }
1764     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1765
1766     t = pa_tagstruct_command(
1767             s->context,
1768             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1769             &tag);
1770     pa_tagstruct_putu32(t, s->channel);
1771     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1772
1773     pa_pstream_send_tagstruct(s->context->pstream, t);
1774     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1775
1776     if (s->direction == PA_STREAM_PLAYBACK) {
1777         /* Fill in initial correction data */
1778
1779         s->current_write_index_correction = cidx;
1780
1781         s->write_index_corrections[cidx].valid = TRUE;
1782         s->write_index_corrections[cidx].absolute = FALSE;
1783         s->write_index_corrections[cidx].corrupt = FALSE;
1784         s->write_index_corrections[cidx].tag = tag;
1785         s->write_index_corrections[cidx].value = 0;
1786     }
1787
1788     return o;
1789 }
1790
1791 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1792     pa_stream *s = userdata;
1793
1794     pa_assert(pd);
1795     pa_assert(s);
1796     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1797
1798     pa_stream_ref(s);
1799
1800     if (command != PA_COMMAND_REPLY) {
1801         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1802             goto finish;
1803
1804         pa_stream_set_state(s, PA_STREAM_FAILED);
1805         goto finish;
1806     } else if (!pa_tagstruct_eof(t)) {
1807         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1808         goto finish;
1809     }
1810
1811     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1812
1813 finish:
1814     pa_stream_unref(s);
1815 }
1816
1817 int pa_stream_disconnect(pa_stream *s) {
1818     pa_tagstruct *t;
1819     uint32_t tag;
1820
1821     pa_assert(s);
1822     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1823
1824     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1825     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1826     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1827
1828     pa_stream_ref(s);
1829
1830     t = pa_tagstruct_command(
1831             s->context,
1832             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1833                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1834             &tag);
1835     pa_tagstruct_putu32(t, s->channel);
1836     pa_pstream_send_tagstruct(s->context->pstream, t);
1837     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1838
1839     pa_stream_unref(s);
1840     return 0;
1841 }
1842
1843 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1844     pa_assert(s);
1845     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1846
1847     if (pa_detect_fork())
1848         return;
1849
1850     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1851         return;
1852
1853     s->read_callback = cb;
1854     s->read_userdata = userdata;
1855 }
1856
1857 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1858     pa_assert(s);
1859     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1860
1861     if (pa_detect_fork())
1862         return;
1863
1864     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1865         return;
1866
1867     s->write_callback = cb;
1868     s->write_userdata = userdata;
1869 }
1870
1871 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1872     pa_assert(s);
1873     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1874
1875     if (pa_detect_fork())
1876         return;
1877
1878     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1879         return;
1880
1881     s->state_callback = cb;
1882     s->state_userdata = userdata;
1883 }
1884
1885 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1886     pa_assert(s);
1887     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1888
1889     if (pa_detect_fork())
1890         return;
1891
1892     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1893         return;
1894
1895     s->overflow_callback = cb;
1896     s->overflow_userdata = userdata;
1897 }
1898
1899 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1900     pa_assert(s);
1901     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1902
1903     if (pa_detect_fork())
1904         return;
1905
1906     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1907         return;
1908
1909     s->underflow_callback = cb;
1910     s->underflow_userdata = userdata;
1911 }
1912
1913 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1914     pa_assert(s);
1915     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1916
1917     if (pa_detect_fork())
1918         return;
1919
1920     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1921         return;
1922
1923     s->latency_update_callback = cb;
1924     s->latency_update_userdata = userdata;
1925 }
1926
1927 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1928     pa_assert(s);
1929     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1930
1931     if (pa_detect_fork())
1932         return;
1933
1934     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1935         return;
1936
1937     s->moved_callback = cb;
1938     s->moved_userdata = userdata;
1939 }
1940
1941 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1942     pa_assert(s);
1943     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1944
1945     if (pa_detect_fork())
1946         return;
1947
1948     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1949         return;
1950
1951     s->suspended_callback = cb;
1952     s->suspended_userdata = userdata;
1953 }
1954
1955 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1956     pa_assert(s);
1957     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1958
1959     if (pa_detect_fork())
1960         return;
1961
1962     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1963         return;
1964
1965     s->started_callback = cb;
1966     s->started_userdata = userdata;
1967 }
1968
1969 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1970     pa_assert(s);
1971     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1972
1973     if (pa_detect_fork())
1974         return;
1975
1976     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1977         return;
1978
1979     s->event_callback = cb;
1980     s->event_userdata = userdata;
1981 }
1982
1983 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1984     pa_assert(s);
1985     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1986
1987     if (pa_detect_fork())
1988         return;
1989
1990     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1991         return;
1992
1993     s->buffer_attr_callback = cb;
1994     s->buffer_attr_userdata = userdata;
1995 }
1996
1997 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1998     pa_operation *o = userdata;
1999     int success = 1;
2000
2001     pa_assert(pd);
2002     pa_assert(o);
2003     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2004
2005     if (!o->context)
2006         goto finish;
2007
2008     if (command != PA_COMMAND_REPLY) {
2009         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2010             goto finish;
2011
2012         success = 0;
2013     } else if (!pa_tagstruct_eof(t)) {
2014         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2015         goto finish;
2016     }
2017
2018     if (o->callback) {
2019         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2020         cb(o->stream, success, o->userdata);
2021     }
2022
2023 finish:
2024     pa_operation_done(o);
2025     pa_operation_unref(o);
2026 }
2027
2028 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2029     pa_operation *o;
2030     pa_tagstruct *t;
2031     uint32_t tag;
2032
2033     pa_assert(s);
2034     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2035
2036     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2037     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2038     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2039
2040     s->corked = b;
2041
2042     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2043
2044     t = pa_tagstruct_command(
2045             s->context,
2046             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2047             &tag);
2048     pa_tagstruct_putu32(t, s->channel);
2049     pa_tagstruct_put_boolean(t, !!b);
2050     pa_pstream_send_tagstruct(s->context->pstream, t);
2051     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2052
2053     check_smoother_status(s, FALSE, FALSE, FALSE);
2054
2055     /* This might cause the indexes to hang/start again, hence
2056      * let's request a timing update */
2057     request_auto_timing_update(s, TRUE);
2058
2059     return o;
2060 }
2061
2062 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2063     pa_tagstruct *t;
2064     pa_operation *o;
2065     uint32_t tag;
2066
2067     pa_assert(s);
2068     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2069
2070     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2071     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2072
2073     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2074
2075     t = pa_tagstruct_command(s->context, command, &tag);
2076     pa_tagstruct_putu32(t, s->channel);
2077     pa_pstream_send_tagstruct(s->context->pstream, t);
2078     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2079
2080     return o;
2081 }
2082
2083 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2084     pa_operation *o;
2085
2086     pa_assert(s);
2087     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2088
2089     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2090     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2091     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2092
2093     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2094         return NULL;
2095
2096     if (s->direction == PA_STREAM_PLAYBACK) {
2097
2098         if (s->write_index_corrections[s->current_write_index_correction].valid)
2099             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2100
2101         if (s->buffer_attr.prebuf > 0)
2102             check_smoother_status(s, FALSE, FALSE, TRUE);
2103
2104         /* This will change the write index, but leave the
2105          * read index untouched. */
2106         invalidate_indexes(s, FALSE, TRUE);
2107
2108     } else
2109         /* For record streams this has no influence on the write
2110          * index, but the read index might jump. */
2111         invalidate_indexes(s, TRUE, FALSE);
2112
2113     return o;
2114 }
2115
2116 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2117     pa_operation *o;
2118
2119     pa_assert(s);
2120     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2121
2122     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2123     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2124     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2125     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2126
2127     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2128         return NULL;
2129
2130     /* This might cause the read index to hang again, hence
2131      * let's request a timing update */
2132     request_auto_timing_update(s, TRUE);
2133
2134     return o;
2135 }
2136
2137 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2138     pa_operation *o;
2139
2140     pa_assert(s);
2141     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2142
2143     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2144     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2145     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2146     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2147
2148     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2149         return NULL;
2150
2151     /* This might cause the read index to start moving again, hence
2152      * let's request a timing update */
2153     request_auto_timing_update(s, TRUE);
2154
2155     return o;
2156 }
2157
2158 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2159     pa_operation *o;
2160
2161     pa_assert(s);
2162     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2163     pa_assert(name);
2164
2165     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2166     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2167     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2168
2169     if (s->context->version >= 13) {
2170         pa_proplist *p = pa_proplist_new();
2171
2172         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2173         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2174         pa_proplist_free(p);
2175     } else {
2176         pa_tagstruct *t;
2177         uint32_t tag;
2178
2179         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2180         t = pa_tagstruct_command(
2181                 s->context,
2182                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2183                 &tag);
2184         pa_tagstruct_putu32(t, s->channel);
2185         pa_tagstruct_puts(t, name);
2186         pa_pstream_send_tagstruct(s->context->pstream, t);
2187         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2188     }
2189
2190     return o;
2191 }
2192
2193 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2194     pa_usec_t usec;
2195
2196     pa_assert(s);
2197     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2198
2199     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2200     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2201     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2202     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2203     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2204     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2205
2206     if (s->smoother)
2207         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2208     else
2209         usec = calc_time(s, FALSE);
2210
2211     /* Make sure the time runs monotonically */
2212     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2213         if (usec < s->previous_time)
2214             usec = s->previous_time;
2215         else
2216             s->previous_time = usec;
2217     }
2218
2219     if (r_usec)
2220         *r_usec = usec;
2221
2222     return 0;
2223 }
2224
2225 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2226     pa_assert(s);
2227     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2228
2229     if (negative)
2230         *negative = 0;
2231
2232     if (a >= b)
2233         return a-b;
2234     else {
2235         if (negative && s->direction == PA_STREAM_RECORD) {
2236             *negative = 1;
2237             return b-a;
2238         } else
2239             return 0;
2240     }
2241 }
2242
2243 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2244     pa_usec_t t, c;
2245     int r;
2246     int64_t cindex;
2247
2248     pa_assert(s);
2249     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2250     pa_assert(r_usec);
2251
2252     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2253     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2254     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2255     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2256     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2257     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2258
2259     if ((r = pa_stream_get_time(s, &t)) < 0)
2260         return r;
2261
2262     if (s->direction == PA_STREAM_PLAYBACK)
2263         cindex = s->timing_info.write_index;
2264     else
2265         cindex = s->timing_info.read_index;
2266
2267     if (cindex < 0)
2268         cindex = 0;
2269
2270     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2271
2272     if (s->direction == PA_STREAM_PLAYBACK)
2273         *r_usec = time_counter_diff(s, c, t, negative);
2274     else
2275         *r_usec = time_counter_diff(s, t, c, negative);
2276
2277     return 0;
2278 }
2279
2280 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2281     pa_assert(s);
2282     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2283
2284     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2285     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2286     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2287     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2288
2289     return &s->timing_info;
2290 }
2291
2292 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2293     pa_assert(s);
2294     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2295
2296     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2297
2298     return &s->sample_spec;
2299 }
2300
2301 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2302     pa_assert(s);
2303     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2304
2305     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2306
2307     return &s->channel_map;
2308 }
2309
2310 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2311     pa_assert(s);
2312     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2313
2314     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2315     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2316     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2317
2318     return &s->buffer_attr;
2319 }
2320
2321 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2322     pa_operation *o = userdata;
2323     int success = 1;
2324
2325     pa_assert(pd);
2326     pa_assert(o);
2327     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2328
2329     if (!o->context)
2330         goto finish;
2331
2332     if (command != PA_COMMAND_REPLY) {
2333         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2334             goto finish;
2335
2336         success = 0;
2337     } else {
2338         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2339             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2340                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2341                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2342                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2343                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2344                 goto finish;
2345             }
2346         } else if (o->stream->direction == PA_STREAM_RECORD) {
2347             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2348                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2349                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2350                 goto finish;
2351             }
2352         }
2353
2354         if (o->stream->context->version >= 13) {
2355             pa_usec_t usec;
2356
2357             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2358                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2359                 goto finish;
2360             }
2361
2362             if (o->stream->direction == PA_STREAM_RECORD)
2363                 o->stream->timing_info.configured_source_usec = usec;
2364             else
2365                 o->stream->timing_info.configured_sink_usec = usec;
2366         }
2367
2368         if (!pa_tagstruct_eof(t)) {
2369             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2370             goto finish;
2371         }
2372     }
2373
2374     if (o->callback) {
2375         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2376         cb(o->stream, success, o->userdata);
2377     }
2378
2379 finish:
2380     pa_operation_done(o);
2381     pa_operation_unref(o);
2382 }
2383
2384
2385 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2386     pa_operation *o;
2387     pa_tagstruct *t;
2388     uint32_t tag;
2389
2390     pa_assert(s);
2391     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2392     pa_assert(attr);
2393
2394     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2395     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2396     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2397     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2398
2399     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2400
2401     t = pa_tagstruct_command(
2402             s->context,
2403             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2404             &tag);
2405     pa_tagstruct_putu32(t, s->channel);
2406
2407     pa_tagstruct_putu32(t, attr->maxlength);
2408
2409     if (s->direction == PA_STREAM_PLAYBACK)
2410         pa_tagstruct_put(
2411                 t,
2412                 PA_TAG_U32, attr->tlength,
2413                 PA_TAG_U32, attr->prebuf,
2414                 PA_TAG_U32, attr->minreq,
2415                 PA_TAG_INVALID);
2416     else
2417         pa_tagstruct_putu32(t, attr->fragsize);
2418
2419     if (s->context->version >= 13)
2420         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2421
2422     if (s->context->version >= 14)
2423         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2424
2425     pa_pstream_send_tagstruct(s->context->pstream, t);
2426     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2427
2428     /* This might cause changes in the read/write indexex, hence let's
2429      * request a timing update */
2430     request_auto_timing_update(s, TRUE);
2431
2432     return o;
2433 }
2434
2435 uint32_t pa_stream_get_device_index(pa_stream *s) {
2436     pa_assert(s);
2437     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2438
2439     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2440     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2441     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2442     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2443     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2444
2445     return s->device_index;
2446 }
2447
2448 const char *pa_stream_get_device_name(pa_stream *s) {
2449     pa_assert(s);
2450     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2451
2452     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2453     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2454     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2455     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2456     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2457
2458     return s->device_name;
2459 }
2460
2461 int pa_stream_is_suspended(pa_stream *s) {
2462     pa_assert(s);
2463     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2464
2465     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2466     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2467     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2468     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2469
2470     return s->suspended;
2471 }
2472
2473 int pa_stream_is_corked(pa_stream *s) {
2474     pa_assert(s);
2475     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2476
2477     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2478     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2479     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2480
2481     return s->corked;
2482 }
2483
2484 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2485     pa_operation *o = userdata;
2486     int success = 1;
2487
2488     pa_assert(pd);
2489     pa_assert(o);
2490     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2491
2492     if (!o->context)
2493         goto finish;
2494
2495     if (command != PA_COMMAND_REPLY) {
2496         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2497             goto finish;
2498
2499         success = 0;
2500     } else {
2501
2502         if (!pa_tagstruct_eof(t)) {
2503             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2504             goto finish;
2505         }
2506     }
2507
2508     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2509     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2510
2511     if (o->callback) {
2512         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2513         cb(o->stream, success, o->userdata);
2514     }
2515
2516 finish:
2517     pa_operation_done(o);
2518     pa_operation_unref(o);
2519 }
2520
2521
2522 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2523     pa_operation *o;
2524     pa_tagstruct *t;
2525     uint32_t tag;
2526
2527     pa_assert(s);
2528     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2529
2530     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2531     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2532     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2533     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2534     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2535     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2536
2537     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2538     o->private = PA_UINT_TO_PTR(rate);
2539
2540     t = pa_tagstruct_command(
2541             s->context,
2542             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2543             &tag);
2544     pa_tagstruct_putu32(t, s->channel);
2545     pa_tagstruct_putu32(t, rate);
2546
2547     pa_pstream_send_tagstruct(s->context->pstream, t);
2548     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2549
2550     return o;
2551 }
2552
2553 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2554     pa_operation *o;
2555     pa_tagstruct *t;
2556     uint32_t tag;
2557
2558     pa_assert(s);
2559     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2560
2561     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2562     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2563     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2564     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2565     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2566
2567     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2568
2569     t = pa_tagstruct_command(
2570             s->context,
2571             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2572             &tag);
2573     pa_tagstruct_putu32(t, s->channel);
2574     pa_tagstruct_putu32(t, (uint32_t) mode);
2575     pa_tagstruct_put_proplist(t, p);
2576
2577     pa_pstream_send_tagstruct(s->context->pstream, t);
2578     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2579
2580     /* Please note that we don't update s->proplist here, because we
2581      * don't export that field */
2582
2583     return o;
2584 }
2585
2586 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2587     pa_operation *o;
2588     pa_tagstruct *t;
2589     uint32_t tag;
2590     const char * const*k;
2591
2592     pa_assert(s);
2593     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2594
2595     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2596     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2597     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2598     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2599     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2600
2601     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2602
2603     t = pa_tagstruct_command(
2604             s->context,
2605             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2606             &tag);
2607     pa_tagstruct_putu32(t, s->channel);
2608
2609     for (k = keys; *k; k++)
2610         pa_tagstruct_puts(t, *k);
2611
2612     pa_tagstruct_puts(t, NULL);
2613
2614     pa_pstream_send_tagstruct(s->context->pstream, t);
2615     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2616
2617     /* Please note that we don't update s->proplist here, because we
2618      * don't export that field */
2619
2620     return o;
2621 }
2622
2623 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2624     pa_assert(s);
2625     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2626
2627     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2628     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2629     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2630     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2631
2632     s->direct_on_input = sink_input_idx;
2633
2634     return 0;
2635 }
2636
2637 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2638     pa_assert(s);
2639     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2640
2641     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2642     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2643     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2644
2645     return s->direct_on_input;
2646 }