pulse: try to fix inaccuracy with uncork timing for streams that are created in corke...
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57     s->read_callback = NULL;
58     s->read_userdata = NULL;
59     s->write_callback = NULL;
60     s->write_userdata = NULL;
61     s->state_callback = NULL;
62     s->state_userdata = NULL;
63     s->overflow_callback = NULL;
64     s->overflow_userdata = NULL;
65     s->underflow_callback = NULL;
66     s->underflow_userdata = NULL;
67     s->latency_update_callback = NULL;
68     s->latency_update_userdata = NULL;
69     s->moved_callback = NULL;
70     s->moved_userdata = NULL;
71     s->suspended_callback = NULL;
72     s->suspended_userdata = NULL;
73     s->started_callback = NULL;
74     s->started_userdata = NULL;
75     s->event_callback = NULL;
76     s->event_userdata = NULL;
77     s->buffer_attr_callback = NULL;
78     s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82         pa_context *c,
83         const char *name,
84         const pa_sample_spec *ss,
85         const pa_channel_map *map,
86         pa_proplist *p) {
87
88     pa_stream *s;
89     int i;
90     pa_channel_map tmap;
91
92     pa_assert(c);
93     pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     if (!map)
104         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114
115     s->sample_spec = *ss;
116     s->channel_map = *map;
117
118     s->direct_on_input = PA_INVALID_INDEX;
119
120     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121     if (name)
122         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124     s->channel = 0;
125     s->channel_valid = FALSE;
126     s->syncid = c->csyncid++;
127     s->stream_index = PA_INVALID_INDEX;
128
129     s->requested_bytes = 0;
130     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132     /* We initialize der target length here, so that if the user
133      * passes no explicit buffering metrics the default is similar to
134      * what older PA versions provided. */
135
136     s->buffer_attr.maxlength = (uint32_t) -1;
137     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138     s->buffer_attr.minreq = (uint32_t) -1;
139     s->buffer_attr.prebuf = (uint32_t) -1;
140     s->buffer_attr.fragsize = (uint32_t) -1;
141
142     s->device_index = PA_INVALID_INDEX;
143     s->device_name = NULL;
144     s->suspended = FALSE;
145     s->corked = FALSE;
146
147     s->write_memblock = NULL;
148     s->write_data = NULL;
149
150     pa_memchunk_reset(&s->peek_memchunk);
151     s->peek_data = NULL;
152     s->record_memblockq = NULL;
153
154     memset(&s->timing_info, 0, sizeof(s->timing_info));
155     s->timing_info_valid = FALSE;
156
157     s->previous_time = 0;
158
159     s->read_index_not_before = 0;
160     s->write_index_not_before = 0;
161     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162         s->write_index_corrections[i].valid = 0;
163     s->current_write_index_correction = 0;
164
165     s->auto_timing_update_event = NULL;
166     s->auto_timing_update_requested = FALSE;
167     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
168
169     reset_callbacks(s);
170
171     s->smoother = NULL;
172
173     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174     PA_LLIST_PREPEND(pa_stream, c->streams, s);
175     pa_stream_ref(s);
176
177     return s;
178 }
179
180 static void stream_unlink(pa_stream *s) {
181     pa_operation *o, *n;
182     pa_assert(s);
183
184     if (!s->context)
185         return;
186
187     /* Detach from context */
188
189     /* Unref all operatio object that point to us */
190     for (o = s->context->operations; o; o = n) {
191         n = o->next;
192
193         if (o->stream == s)
194             pa_operation_cancel(o);
195     }
196
197     /* Drop all outstanding replies for this stream */
198     if (s->context->pdispatch)
199         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200
201     if (s->channel_valid) {
202         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203         s->channel = 0;
204         s->channel_valid = FALSE;
205     }
206
207     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208     pa_stream_unref(s);
209
210     s->context = NULL;
211
212     if (s->auto_timing_update_event) {
213         pa_assert(s->mainloop);
214         s->mainloop->time_free(s->auto_timing_update_event);
215     }
216
217     reset_callbacks(s);
218 }
219
220 static void stream_free(pa_stream *s) {
221     pa_assert(s);
222
223     stream_unlink(s);
224
225     if (s->write_memblock) {
226         pa_memblock_release(s->write_memblock);
227         pa_memblock_unref(s->write_data);
228     }
229
230     if (s->peek_memchunk.memblock) {
231         if (s->peek_data)
232             pa_memblock_release(s->peek_memchunk.memblock);
233         pa_memblock_unref(s->peek_memchunk.memblock);
234     }
235
236     if (s->record_memblockq)
237         pa_memblockq_free(s->record_memblockq);
238
239     if (s->proplist)
240         pa_proplist_free(s->proplist);
241
242     if (s->smoother)
243         pa_smoother_free(s->smoother);
244
245     pa_xfree(s->device_name);
246     pa_xfree(s);
247 }
248
249 void pa_stream_unref(pa_stream *s) {
250     pa_assert(s);
251     pa_assert(PA_REFCNT_VALUE(s) >= 1);
252
253     if (PA_REFCNT_DEC(s) <= 0)
254         stream_free(s);
255 }
256
257 pa_stream* pa_stream_ref(pa_stream *s) {
258     pa_assert(s);
259     pa_assert(PA_REFCNT_VALUE(s) >= 1);
260
261     PA_REFCNT_INC(s);
262     return s;
263 }
264
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->state;
270 }
271
272 pa_context* pa_stream_get_context(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     return s->context;
277 }
278
279 uint32_t pa_stream_get_index(pa_stream *s) {
280     pa_assert(s);
281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
282
283     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
285
286     return s->stream_index;
287 }
288
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290     pa_assert(s);
291     pa_assert(PA_REFCNT_VALUE(s) >= 1);
292
293     if (s->state == st)
294         return;
295
296     pa_stream_ref(s);
297
298     s->state = st;
299
300     if (s->state_callback)
301         s->state_callback(s, s->state_userdata);
302
303     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304         stream_unlink(s);
305
306     pa_stream_unref(s);
307 }
308
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310     pa_assert(s);
311     pa_assert(PA_REFCNT_VALUE(s) >= 1);
312
313     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314         return;
315
316     if (s->state == PA_STREAM_READY &&
317         (force || !s->auto_timing_update_requested)) {
318         pa_operation *o;
319
320 /*         pa_log("Automatically requesting new timing data"); */
321
322         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323             pa_operation_unref(o);
324             s->auto_timing_update_requested = TRUE;
325         }
326     }
327
328     if (s->auto_timing_update_event) {
329         if (force)
330             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
331
332         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
333
334         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335     }
336 }
337
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339     pa_context *c = userdata;
340     pa_stream *s;
341     uint32_t channel;
342
343     pa_assert(pd);
344     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345     pa_assert(t);
346     pa_assert(c);
347     pa_assert(PA_REFCNT_VALUE(c) >= 1);
348
349     pa_context_ref(c);
350
351     if (pa_tagstruct_getu32(t, &channel) < 0 ||
352         !pa_tagstruct_eof(t)) {
353         pa_context_fail(c, PA_ERR_PROTOCOL);
354         goto finish;
355     }
356
357     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358         goto finish;
359
360     if (s->state != PA_STREAM_READY)
361         goto finish;
362
363     pa_context_set_error(c, PA_ERR_KILLED);
364     pa_stream_set_state(s, PA_STREAM_FAILED);
365
366 finish:
367     pa_context_unref(c);
368 }
369
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371     pa_usec_t x;
372
373     pa_assert(s);
374     pa_assert(!force_start || !force_stop);
375
376     if (!s->smoother)
377         return;
378
379     if (!s->timing_info_valid &&
380         !aposteriori &&
381         !force_start &&
382         !force_stop &&
383         s->context->version >= 13) {
384
385         /* If the server supports STARTED and UNDERFLOW events we take
386          * them as indications when audio really starts/stops playing,
387          * if we don't have any timing info yet -- instead of trying
388          * to be smart and guessing the server time. Otherwise the
389          * unknown transport delay we don't know might add too much
390          * noise to our time calculations. */
391
392         return;
393     }
394
395     x = pa_rtclock_now();
396
397     if (s->timing_info_valid) {
398         if (aposteriori)
399             x -= s->timing_info.transport_usec;
400         else
401             x += s->timing_info.transport_usec;
402     }
403
404     if (s->suspended || s->corked || force_stop)
405         pa_smoother_pause(s->smoother, x);
406     else if (force_start || s->buffer_attr.prebuf == 0)
407         pa_smoother_resume(s->smoother, x, TRUE);
408
409     /* Please note that we have no idea if playback actually started
410      * if prebuf is non-zero! */
411 }
412
413 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
414     pa_context *c = userdata;
415     pa_stream *s;
416     uint32_t channel;
417     const char *dn;
418     pa_bool_t suspended;
419     uint32_t di;
420     pa_usec_t usec = 0;
421     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
422
423     pa_assert(pd);
424     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
425     pa_assert(t);
426     pa_assert(c);
427     pa_assert(PA_REFCNT_VALUE(c) >= 1);
428
429     pa_context_ref(c);
430
431     if (c->version < 12) {
432         pa_context_fail(c, PA_ERR_PROTOCOL);
433         goto finish;
434     }
435
436     if (pa_tagstruct_getu32(t, &channel) < 0 ||
437         pa_tagstruct_getu32(t, &di) < 0 ||
438         pa_tagstruct_gets(t, &dn) < 0 ||
439         pa_tagstruct_get_boolean(t, &suspended) < 0) {
440         pa_context_fail(c, PA_ERR_PROTOCOL);
441         goto finish;
442     }
443
444     if (c->version >= 13) {
445
446         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
447             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
448                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
449                 pa_tagstruct_get_usec(t, &usec) < 0) {
450                 pa_context_fail(c, PA_ERR_PROTOCOL);
451                 goto finish;
452             }
453         } else {
454             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
455                 pa_tagstruct_getu32(t, &tlength) < 0 ||
456                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
457                 pa_tagstruct_getu32(t, &minreq) < 0 ||
458                 pa_tagstruct_get_usec(t, &usec) < 0) {
459                 pa_context_fail(c, PA_ERR_PROTOCOL);
460                 goto finish;
461             }
462         }
463     }
464
465     if (!pa_tagstruct_eof(t)) {
466         pa_context_fail(c, PA_ERR_PROTOCOL);
467         goto finish;
468     }
469
470     if (!dn || di == PA_INVALID_INDEX) {
471         pa_context_fail(c, PA_ERR_PROTOCOL);
472         goto finish;
473     }
474
475     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
476         goto finish;
477
478     if (s->state != PA_STREAM_READY)
479         goto finish;
480
481     if (c->version >= 13) {
482         if (s->direction == PA_STREAM_RECORD)
483             s->timing_info.configured_source_usec = usec;
484         else
485             s->timing_info.configured_sink_usec = usec;
486
487         s->buffer_attr.maxlength = maxlength;
488         s->buffer_attr.fragsize = fragsize;
489         s->buffer_attr.tlength = tlength;
490         s->buffer_attr.prebuf = prebuf;
491         s->buffer_attr.minreq = minreq;
492     }
493
494     pa_xfree(s->device_name);
495     s->device_name = pa_xstrdup(dn);
496     s->device_index = di;
497
498     s->suspended = suspended;
499
500     check_smoother_status(s, TRUE, FALSE, FALSE);
501     request_auto_timing_update(s, TRUE);
502
503     if (s->moved_callback)
504         s->moved_callback(s, s->moved_userdata);
505
506 finish:
507     pa_context_unref(c);
508 }
509
510 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
511     pa_context *c = userdata;
512     pa_stream *s;
513     uint32_t channel;
514     pa_usec_t usec = 0;
515     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
516
517     pa_assert(pd);
518     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
519     pa_assert(t);
520     pa_assert(c);
521     pa_assert(PA_REFCNT_VALUE(c) >= 1);
522
523     pa_context_ref(c);
524
525     if (c->version < 15) {
526         pa_context_fail(c, PA_ERR_PROTOCOL);
527         goto finish;
528     }
529
530     if (pa_tagstruct_getu32(t, &channel) < 0) {
531         pa_context_fail(c, PA_ERR_PROTOCOL);
532         goto finish;
533     }
534
535     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
536         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
537             pa_tagstruct_getu32(t, &fragsize) < 0 ||
538             pa_tagstruct_get_usec(t, &usec) < 0) {
539             pa_context_fail(c, PA_ERR_PROTOCOL);
540             goto finish;
541         }
542     } else {
543         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
544             pa_tagstruct_getu32(t, &tlength) < 0 ||
545             pa_tagstruct_getu32(t, &prebuf) < 0 ||
546             pa_tagstruct_getu32(t, &minreq) < 0 ||
547             pa_tagstruct_get_usec(t, &usec) < 0) {
548             pa_context_fail(c, PA_ERR_PROTOCOL);
549             goto finish;
550         }
551     }
552
553     if (!pa_tagstruct_eof(t)) {
554         pa_context_fail(c, PA_ERR_PROTOCOL);
555         goto finish;
556     }
557
558     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
559         goto finish;
560
561     if (s->state != PA_STREAM_READY)
562         goto finish;
563
564     if (s->direction == PA_STREAM_RECORD)
565         s->timing_info.configured_source_usec = usec;
566     else
567         s->timing_info.configured_sink_usec = usec;
568
569     s->buffer_attr.maxlength = maxlength;
570     s->buffer_attr.fragsize = fragsize;
571     s->buffer_attr.tlength = tlength;
572     s->buffer_attr.prebuf = prebuf;
573     s->buffer_attr.minreq = minreq;
574
575     request_auto_timing_update(s, TRUE);
576
577     if (s->buffer_attr_callback)
578         s->buffer_attr_callback(s, s->buffer_attr_userdata);
579
580 finish:
581     pa_context_unref(c);
582 }
583
584 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
585     pa_context *c = userdata;
586     pa_stream *s;
587     uint32_t channel;
588     pa_bool_t suspended;
589
590     pa_assert(pd);
591     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
592     pa_assert(t);
593     pa_assert(c);
594     pa_assert(PA_REFCNT_VALUE(c) >= 1);
595
596     pa_context_ref(c);
597
598     if (c->version < 12) {
599         pa_context_fail(c, PA_ERR_PROTOCOL);
600         goto finish;
601     }
602
603     if (pa_tagstruct_getu32(t, &channel) < 0 ||
604         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
605         !pa_tagstruct_eof(t)) {
606         pa_context_fail(c, PA_ERR_PROTOCOL);
607         goto finish;
608     }
609
610     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
611         goto finish;
612
613     if (s->state != PA_STREAM_READY)
614         goto finish;
615
616     s->suspended = suspended;
617
618     check_smoother_status(s, TRUE, FALSE, FALSE);
619     request_auto_timing_update(s, TRUE);
620
621     if (s->suspended_callback)
622         s->suspended_callback(s, s->suspended_userdata);
623
624 finish:
625     pa_context_unref(c);
626 }
627
628 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
629     pa_context *c = userdata;
630     pa_stream *s;
631     uint32_t channel;
632
633     pa_assert(pd);
634     pa_assert(command == PA_COMMAND_STARTED);
635     pa_assert(t);
636     pa_assert(c);
637     pa_assert(PA_REFCNT_VALUE(c) >= 1);
638
639     pa_context_ref(c);
640
641     if (c->version < 13) {
642         pa_context_fail(c, PA_ERR_PROTOCOL);
643         goto finish;
644     }
645
646     if (pa_tagstruct_getu32(t, &channel) < 0 ||
647         !pa_tagstruct_eof(t)) {
648         pa_context_fail(c, PA_ERR_PROTOCOL);
649         goto finish;
650     }
651
652     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
653         goto finish;
654
655     if (s->state != PA_STREAM_READY)
656         goto finish;
657
658     check_smoother_status(s, TRUE, TRUE, FALSE);
659     request_auto_timing_update(s, TRUE);
660
661     if (s->started_callback)
662         s->started_callback(s, s->started_userdata);
663
664 finish:
665     pa_context_unref(c);
666 }
667
668 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
669     pa_context *c = userdata;
670     pa_stream *s;
671     uint32_t channel;
672     pa_proplist *pl = NULL;
673     const char *event;
674
675     pa_assert(pd);
676     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
677     pa_assert(t);
678     pa_assert(c);
679     pa_assert(PA_REFCNT_VALUE(c) >= 1);
680
681     pa_context_ref(c);
682
683     if (c->version < 15) {
684         pa_context_fail(c, PA_ERR_PROTOCOL);
685         goto finish;
686     }
687
688     pl = pa_proplist_new();
689
690     if (pa_tagstruct_getu32(t, &channel) < 0 ||
691         pa_tagstruct_gets(t, &event) < 0 ||
692         pa_tagstruct_get_proplist(t, pl) < 0 ||
693         !pa_tagstruct_eof(t) || !event) {
694         pa_context_fail(c, PA_ERR_PROTOCOL);
695         goto finish;
696     }
697
698     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
699         goto finish;
700
701     if (s->state != PA_STREAM_READY)
702         goto finish;
703
704     if (s->event_callback)
705         s->event_callback(s, event, pl, s->event_userdata);
706
707 finish:
708     pa_context_unref(c);
709
710     if (pl)
711         pa_proplist_free(pl);
712 }
713
714 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
715     pa_stream *s;
716     pa_context *c = userdata;
717     uint32_t bytes, channel;
718
719     pa_assert(pd);
720     pa_assert(command == PA_COMMAND_REQUEST);
721     pa_assert(t);
722     pa_assert(c);
723     pa_assert(PA_REFCNT_VALUE(c) >= 1);
724
725     pa_context_ref(c);
726
727     if (pa_tagstruct_getu32(t, &channel) < 0 ||
728         pa_tagstruct_getu32(t, &bytes) < 0 ||
729         !pa_tagstruct_eof(t)) {
730         pa_context_fail(c, PA_ERR_PROTOCOL);
731         goto finish;
732     }
733
734     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
735         goto finish;
736
737     if (s->state != PA_STREAM_READY)
738         goto finish;
739
740     s->requested_bytes += bytes;
741
742     if (s->requested_bytes > 0 && s->write_callback)
743         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
744
745 finish:
746     pa_context_unref(c);
747 }
748
749 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
750     pa_stream *s;
751     pa_context *c = userdata;
752     uint32_t channel;
753
754     pa_assert(pd);
755     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
756     pa_assert(t);
757     pa_assert(c);
758     pa_assert(PA_REFCNT_VALUE(c) >= 1);
759
760     pa_context_ref(c);
761
762     if (pa_tagstruct_getu32(t, &channel) < 0 ||
763         !pa_tagstruct_eof(t)) {
764         pa_context_fail(c, PA_ERR_PROTOCOL);
765         goto finish;
766     }
767
768     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
769         goto finish;
770
771     if (s->state != PA_STREAM_READY)
772         goto finish;
773
774     if (s->buffer_attr.prebuf > 0)
775         check_smoother_status(s, TRUE, FALSE, TRUE);
776
777     request_auto_timing_update(s, TRUE);
778
779     if (command == PA_COMMAND_OVERFLOW) {
780         if (s->overflow_callback)
781             s->overflow_callback(s, s->overflow_userdata);
782     } else if (command == PA_COMMAND_UNDERFLOW) {
783         if (s->underflow_callback)
784             s->underflow_callback(s, s->underflow_userdata);
785     }
786
787  finish:
788     pa_context_unref(c);
789 }
790
791 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
792     pa_assert(s);
793     pa_assert(PA_REFCNT_VALUE(s) >= 1);
794
795 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
796
797     if (s->state != PA_STREAM_READY)
798         return;
799
800     if (w) {
801         s->write_index_not_before = s->context->ctag;
802
803         if (s->timing_info_valid)
804             s->timing_info.write_index_corrupt = TRUE;
805
806 /*         pa_log("write_index invalidated"); */
807     }
808
809     if (r) {
810         s->read_index_not_before = s->context->ctag;
811
812         if (s->timing_info_valid)
813             s->timing_info.read_index_corrupt = TRUE;
814
815 /*         pa_log("read_index invalidated"); */
816     }
817
818     request_auto_timing_update(s, TRUE);
819 }
820
821 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
822     pa_stream *s = userdata;
823
824     pa_assert(s);
825     pa_assert(PA_REFCNT_VALUE(s) >= 1);
826
827     pa_stream_ref(s);
828     request_auto_timing_update(s, FALSE);
829     pa_stream_unref(s);
830 }
831
832 static void create_stream_complete(pa_stream *s) {
833     pa_assert(s);
834     pa_assert(PA_REFCNT_VALUE(s) >= 1);
835     pa_assert(s->state == PA_STREAM_CREATING);
836
837     pa_stream_set_state(s, PA_STREAM_READY);
838
839     if (s->requested_bytes > 0 && s->write_callback)
840         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
841
842     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
843         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
844         pa_assert(!s->auto_timing_update_event);
845         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
846
847         request_auto_timing_update(s, TRUE);
848     }
849
850     check_smoother_status(s, TRUE, FALSE, FALSE);
851 }
852
853 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
854     pa_assert(s);
855     pa_assert(attr);
856     pa_assert(ss);
857
858     if (s->context->version >= 13)
859         return;
860
861     /* Version older than 0.9.10 didn't do server side buffer_attr
862      * selection, hence we have to fake it on the client side. */
863
864     /* We choose fairly conservative values here, to not confuse
865      * old clients with extremely large playback buffers */
866
867     if (attr->maxlength == (uint32_t) -1)
868         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
869
870     if (attr->tlength == (uint32_t) -1)
871         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
872
873     if (attr->minreq == (uint32_t) -1)
874         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
875
876     if (attr->prebuf == (uint32_t) -1)
877         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
878
879     if (attr->fragsize == (uint32_t) -1)
880         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
881 }
882
883 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
884     pa_stream *s = userdata;
885     uint32_t requested_bytes = 0;
886
887     pa_assert(pd);
888     pa_assert(s);
889     pa_assert(PA_REFCNT_VALUE(s) >= 1);
890     pa_assert(s->state == PA_STREAM_CREATING);
891
892     pa_stream_ref(s);
893
894     if (command != PA_COMMAND_REPLY) {
895         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
896             goto finish;
897
898         pa_stream_set_state(s, PA_STREAM_FAILED);
899         goto finish;
900     }
901
902     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
903         s->channel == PA_INVALID_INDEX ||
904         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
905         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
906         pa_context_fail(s->context, PA_ERR_PROTOCOL);
907         goto finish;
908     }
909
910     s->requested_bytes = (int64_t) requested_bytes;
911
912     if (s->context->version >= 9) {
913         if (s->direction == PA_STREAM_PLAYBACK) {
914             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
915                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
916                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
917                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
918                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
919                 goto finish;
920             }
921         } else if (s->direction == PA_STREAM_RECORD) {
922             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
923                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
924                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
925                 goto finish;
926             }
927         }
928     }
929
930     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
931         pa_sample_spec ss;
932         pa_channel_map cm;
933         const char *dn = NULL;
934         pa_bool_t suspended;
935
936         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
937             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
938             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
939             pa_tagstruct_gets(t, &dn) < 0 ||
940             pa_tagstruct_get_boolean(t, &suspended) < 0) {
941             pa_context_fail(s->context, PA_ERR_PROTOCOL);
942             goto finish;
943         }
944
945         if (!dn || s->device_index == PA_INVALID_INDEX ||
946             ss.channels != cm.channels ||
947             !pa_channel_map_valid(&cm) ||
948             !pa_sample_spec_valid(&ss) ||
949             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
950             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
951             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
952             pa_context_fail(s->context, PA_ERR_PROTOCOL);
953             goto finish;
954         }
955
956         pa_xfree(s->device_name);
957         s->device_name = pa_xstrdup(dn);
958         s->suspended = suspended;
959
960         s->channel_map = cm;
961         s->sample_spec = ss;
962     }
963
964     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
965         pa_usec_t usec;
966
967         if (pa_tagstruct_get_usec(t, &usec) < 0) {
968             pa_context_fail(s->context, PA_ERR_PROTOCOL);
969             goto finish;
970         }
971
972         if (s->direction == PA_STREAM_RECORD)
973             s->timing_info.configured_source_usec = usec;
974         else
975             s->timing_info.configured_sink_usec = usec;
976     }
977
978     if (!pa_tagstruct_eof(t)) {
979         pa_context_fail(s->context, PA_ERR_PROTOCOL);
980         goto finish;
981     }
982
983     if (s->direction == PA_STREAM_RECORD) {
984         pa_assert(!s->record_memblockq);
985
986         s->record_memblockq = pa_memblockq_new(
987                 0,
988                 s->buffer_attr.maxlength,
989                 0,
990                 pa_frame_size(&s->sample_spec),
991                 1,
992                 0,
993                 0,
994                 NULL);
995     }
996
997     s->channel_valid = TRUE;
998     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
999
1000     create_stream_complete(s);
1001
1002 finish:
1003     pa_stream_unref(s);
1004 }
1005
1006 static int create_stream(
1007         pa_stream_direction_t direction,
1008         pa_stream *s,
1009         const char *dev,
1010         const pa_buffer_attr *attr,
1011         pa_stream_flags_t flags,
1012         const pa_cvolume *volume,
1013         pa_stream *sync_stream) {
1014
1015     pa_tagstruct *t;
1016     uint32_t tag;
1017     pa_bool_t volume_set = FALSE;
1018
1019     pa_assert(s);
1020     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1021     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1022
1023     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1024     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1025     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1026     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1027                                               PA_STREAM_INTERPOLATE_TIMING|
1028                                               PA_STREAM_NOT_MONOTONIC|
1029                                               PA_STREAM_AUTO_TIMING_UPDATE|
1030                                               PA_STREAM_NO_REMAP_CHANNELS|
1031                                               PA_STREAM_NO_REMIX_CHANNELS|
1032                                               PA_STREAM_FIX_FORMAT|
1033                                               PA_STREAM_FIX_RATE|
1034                                               PA_STREAM_FIX_CHANNELS|
1035                                               PA_STREAM_DONT_MOVE|
1036                                               PA_STREAM_VARIABLE_RATE|
1037                                               PA_STREAM_PEAK_DETECT|
1038                                               PA_STREAM_START_MUTED|
1039                                               PA_STREAM_ADJUST_LATENCY|
1040                                               PA_STREAM_EARLY_REQUESTS|
1041                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1042                                               PA_STREAM_START_UNMUTED|
1043                                               PA_STREAM_FAIL_ON_SUSPEND|
1044                                               PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1045
1046     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1047     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1048     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1049     /* Althought some of the other flags are not supported on older
1050      * version, we don't check for them here, because it doesn't hurt
1051      * when they are passed but actually not supported. This makes
1052      * client development easier */
1053
1054     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1055     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1056     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1057     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1058     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1059
1060     pa_stream_ref(s);
1061
1062     s->direction = direction;
1063     s->flags = flags;
1064     s->corked = !!(flags & PA_STREAM_START_CORKED);
1065
1066     if (sync_stream)
1067         s->syncid = sync_stream->syncid;
1068
1069     if (attr)
1070         s->buffer_attr = *attr;
1071     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1072
1073     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1074         pa_usec_t x;
1075
1076         x = pa_rtclock_now();
1077
1078         pa_assert(!s->smoother);
1079         s->smoother = pa_smoother_new(
1080                 SMOOTHER_ADJUST_TIME,
1081                 SMOOTHER_HISTORY_TIME,
1082                 !(flags & PA_STREAM_NOT_MONOTONIC),
1083                 TRUE,
1084                 SMOOTHER_MIN_HISTORY,
1085                 x,
1086                 TRUE);
1087     }
1088
1089     if (!dev)
1090         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1091
1092     t = pa_tagstruct_command(
1093             s->context,
1094             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1095             &tag);
1096
1097     if (s->context->version < 13)
1098         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1099
1100     pa_tagstruct_put(
1101             t,
1102             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1103             PA_TAG_CHANNEL_MAP, &s->channel_map,
1104             PA_TAG_U32, PA_INVALID_INDEX,
1105             PA_TAG_STRING, dev,
1106             PA_TAG_U32, s->buffer_attr.maxlength,
1107             PA_TAG_BOOLEAN, s->corked,
1108             PA_TAG_INVALID);
1109
1110     if (s->direction == PA_STREAM_PLAYBACK) {
1111         pa_cvolume cv;
1112
1113         pa_tagstruct_put(
1114                 t,
1115                 PA_TAG_U32, s->buffer_attr.tlength,
1116                 PA_TAG_U32, s->buffer_attr.prebuf,
1117                 PA_TAG_U32, s->buffer_attr.minreq,
1118                 PA_TAG_U32, s->syncid,
1119                 PA_TAG_INVALID);
1120
1121         volume_set = !!volume;
1122
1123         if (!volume)
1124             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1125
1126         pa_tagstruct_put_cvolume(t, volume);
1127     } else
1128         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1129
1130     if (s->context->version >= 12) {
1131         pa_tagstruct_put(
1132                 t,
1133                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1134                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1135                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1136                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1137                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1138                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1139                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1140                 PA_TAG_INVALID);
1141     }
1142
1143     if (s->context->version >= 13) {
1144
1145         if (s->direction == PA_STREAM_PLAYBACK)
1146             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1147         else
1148             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1149
1150         pa_tagstruct_put(
1151                 t,
1152                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1153                 PA_TAG_PROPLIST, s->proplist,
1154                 PA_TAG_INVALID);
1155
1156         if (s->direction == PA_STREAM_RECORD)
1157             pa_tagstruct_putu32(t, s->direct_on_input);
1158     }
1159
1160     if (s->context->version >= 14) {
1161
1162         if (s->direction == PA_STREAM_PLAYBACK)
1163             pa_tagstruct_put_boolean(t, volume_set);
1164
1165         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1166     }
1167
1168     if (s->context->version >= 15) {
1169
1170         if (s->direction == PA_STREAM_PLAYBACK)
1171             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1172
1173         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1174         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1175     }
1176
1177     if (s->context->version >= 17) {
1178
1179         if (s->direction == PA_STREAM_PLAYBACK)
1180             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1181
1182     }
1183
1184     pa_pstream_send_tagstruct(s->context->pstream, t);
1185     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1186
1187     pa_stream_set_state(s, PA_STREAM_CREATING);
1188
1189     pa_stream_unref(s);
1190     return 0;
1191 }
1192
1193 int pa_stream_connect_playback(
1194         pa_stream *s,
1195         const char *dev,
1196         const pa_buffer_attr *attr,
1197         pa_stream_flags_t flags,
1198         const pa_cvolume *volume,
1199         pa_stream *sync_stream) {
1200
1201     pa_assert(s);
1202     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1203
1204     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1205 }
1206
1207 int pa_stream_connect_record(
1208         pa_stream *s,
1209         const char *dev,
1210         const pa_buffer_attr *attr,
1211         pa_stream_flags_t flags) {
1212
1213     pa_assert(s);
1214     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1215
1216     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1217 }
1218
1219 int pa_stream_begin_write(
1220         pa_stream *s,
1221         void **data,
1222         size_t *nbytes) {
1223
1224     pa_assert(s);
1225     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1226
1227     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1228     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1229     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1230     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1231     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1232
1233     if (*nbytes != (size_t) -1) {
1234         size_t m, fs;
1235
1236         m = pa_mempool_block_size_max(s->context->mempool);
1237         fs = pa_frame_size(&s->sample_spec);
1238
1239         m = (m / fs) * fs;
1240         if (*nbytes > m)
1241             *nbytes = m;
1242     }
1243
1244     if (!s->write_memblock) {
1245         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1246         s->write_data = pa_memblock_acquire(s->write_memblock);
1247     }
1248
1249     *data = s->write_data;
1250     *nbytes = pa_memblock_get_length(s->write_memblock);
1251
1252     return 0;
1253 }
1254
1255 int pa_stream_cancel_write(
1256         pa_stream *s) {
1257
1258     pa_assert(s);
1259     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1260
1261     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1262     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1263     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1264     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1265
1266     pa_assert(s->write_data);
1267
1268     pa_memblock_release(s->write_memblock);
1269     pa_memblock_unref(s->write_memblock);
1270     s->write_memblock = NULL;
1271     s->write_data = NULL;
1272
1273     return 0;
1274 }
1275
1276 int pa_stream_write(
1277         pa_stream *s,
1278         const void *data,
1279         size_t length,
1280         pa_free_cb_t free_cb,
1281         int64_t offset,
1282         pa_seek_mode_t seek) {
1283
1284     pa_assert(s);
1285     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1286     pa_assert(data);
1287
1288     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1289     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1290     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1291     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1292     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1293     PA_CHECK_VALIDITY(s->context,
1294                       !s->write_memblock ||
1295                       ((data >= s->write_data) &&
1296                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1297                       PA_ERR_INVALID);
1298     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1299
1300     if (s->write_memblock) {
1301         pa_memchunk chunk;
1302
1303         /* pa_stream_write_begin() was called before */
1304
1305         pa_memblock_release(s->write_memblock);
1306
1307         chunk.memblock = s->write_memblock;
1308         chunk.index = (const char *) data - (const char *) s->write_data;
1309         chunk.length = length;
1310
1311         s->write_memblock = NULL;
1312         s->write_data = NULL;
1313
1314         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1315         pa_memblock_unref(chunk.memblock);
1316
1317     } else {
1318         pa_seek_mode_t t_seek = seek;
1319         int64_t t_offset = offset;
1320         size_t t_length = length;
1321         const void *t_data = data;
1322
1323         /* pa_stream_write_begin() was not called before */
1324
1325         while (t_length > 0) {
1326             pa_memchunk chunk;
1327
1328             chunk.index = 0;
1329
1330             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1331                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1332                 chunk.length = t_length;
1333             } else {
1334                 void *d;
1335
1336                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1337                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1338
1339                 d = pa_memblock_acquire(chunk.memblock);
1340                 memcpy(d, t_data, chunk.length);
1341                 pa_memblock_release(chunk.memblock);
1342             }
1343
1344             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1345
1346             t_offset = 0;
1347             t_seek = PA_SEEK_RELATIVE;
1348
1349             t_data = (const uint8_t*) t_data + chunk.length;
1350             t_length -= chunk.length;
1351
1352             pa_memblock_unref(chunk.memblock);
1353         }
1354
1355         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1356             free_cb((void*) data);
1357     }
1358
1359     /* This is obviously wrong since we ignore the seeking index . But
1360      * that's OK, the server side applies the same error */
1361     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1362
1363     if (s->direction == PA_STREAM_PLAYBACK) {
1364
1365         /* Update latency request correction */
1366         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1367
1368             if (seek == PA_SEEK_ABSOLUTE) {
1369                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1370                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1371                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1372             } else if (seek == PA_SEEK_RELATIVE) {
1373                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1374                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1375             } else
1376                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1377         }
1378
1379         /* Update the write index in the already available latency data */
1380         if (s->timing_info_valid) {
1381
1382             if (seek == PA_SEEK_ABSOLUTE) {
1383                 s->timing_info.write_index_corrupt = FALSE;
1384                 s->timing_info.write_index = offset + (int64_t) length;
1385             } else if (seek == PA_SEEK_RELATIVE) {
1386                 if (!s->timing_info.write_index_corrupt)
1387                     s->timing_info.write_index += offset + (int64_t) length;
1388             } else
1389                 s->timing_info.write_index_corrupt = TRUE;
1390         }
1391
1392         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1393             request_auto_timing_update(s, TRUE);
1394     }
1395
1396     return 0;
1397 }
1398
1399 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1400     pa_assert(s);
1401     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1402     pa_assert(data);
1403     pa_assert(length);
1404
1405     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1406     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1407     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1408
1409     if (!s->peek_memchunk.memblock) {
1410
1411         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1412             *data = NULL;
1413             *length = 0;
1414             return 0;
1415         }
1416
1417         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1418     }
1419
1420     pa_assert(s->peek_data);
1421     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1422     *length = s->peek_memchunk.length;
1423     return 0;
1424 }
1425
1426 int pa_stream_drop(pa_stream *s) {
1427     pa_assert(s);
1428     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1429
1430     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1431     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1432     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1433     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1434
1435     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1436
1437     /* Fix the simulated local read index */
1438     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1439         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1440
1441     pa_assert(s->peek_data);
1442     pa_memblock_release(s->peek_memchunk.memblock);
1443     pa_memblock_unref(s->peek_memchunk.memblock);
1444     pa_memchunk_reset(&s->peek_memchunk);
1445
1446     return 0;
1447 }
1448
1449 size_t pa_stream_writable_size(pa_stream *s) {
1450     pa_assert(s);
1451     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1452
1453     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1454     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1455     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1456
1457     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1458 }
1459
1460 size_t pa_stream_readable_size(pa_stream *s) {
1461     pa_assert(s);
1462     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1463
1464     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1465     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1466     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1467
1468     return pa_memblockq_get_length(s->record_memblockq);
1469 }
1470
1471 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1472     pa_operation *o;
1473     pa_tagstruct *t;
1474     uint32_t tag;
1475
1476     pa_assert(s);
1477     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1478
1479     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1480     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1481     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1482
1483     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1484
1485     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1486     pa_tagstruct_putu32(t, s->channel);
1487     pa_pstream_send_tagstruct(s->context->pstream, t);
1488     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1489
1490     return o;
1491 }
1492
1493 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1494     pa_usec_t usec;
1495
1496     pa_assert(s);
1497     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1498     pa_assert(s->state == PA_STREAM_READY);
1499     pa_assert(s->direction != PA_STREAM_UPLOAD);
1500     pa_assert(s->timing_info_valid);
1501     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1502     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1503
1504     if (s->direction == PA_STREAM_PLAYBACK) {
1505         /* The last byte that was written into the output device
1506          * had this time value associated */
1507         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1508
1509         if (!s->corked && !s->suspended) {
1510
1511             if (!ignore_transport)
1512                 /* Because the latency info took a little time to come
1513                  * to us, we assume that the real output time is actually
1514                  * a little ahead */
1515                 usec += s->timing_info.transport_usec;
1516
1517             /* However, the output device usually maintains a buffer
1518                too, hence the real sample currently played is a little
1519                back  */
1520             if (s->timing_info.sink_usec >= usec)
1521                 usec = 0;
1522             else
1523                 usec -= s->timing_info.sink_usec;
1524         }
1525
1526     } else {
1527         pa_assert(s->direction == PA_STREAM_RECORD);
1528
1529         /* The last byte written into the server side queue had
1530          * this time value associated */
1531         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1532
1533         if (!s->corked && !s->suspended) {
1534
1535             if (!ignore_transport)
1536                 /* Add transport latency */
1537                 usec += s->timing_info.transport_usec;
1538
1539             /* Add latency of data in device buffer */
1540             usec += s->timing_info.source_usec;
1541
1542             /* If this is a monitor source, we need to correct the
1543              * time by the playback device buffer */
1544             if (s->timing_info.sink_usec >= usec)
1545                 usec = 0;
1546             else
1547                 usec -= s->timing_info.sink_usec;
1548         }
1549     }
1550
1551     return usec;
1552 }
1553
1554 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1555     pa_operation *o = userdata;
1556     struct timeval local, remote, now;
1557     pa_timing_info *i;
1558     pa_bool_t playing = FALSE;
1559     uint64_t underrun_for = 0, playing_for = 0;
1560
1561     pa_assert(pd);
1562     pa_assert(o);
1563     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1564
1565     if (!o->context || !o->stream)
1566         goto finish;
1567
1568     i = &o->stream->timing_info;
1569
1570     o->stream->timing_info_valid = FALSE;
1571     i->write_index_corrupt = TRUE;
1572     i->read_index_corrupt = TRUE;
1573
1574     if (command != PA_COMMAND_REPLY) {
1575         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1576             goto finish;
1577
1578     } else {
1579
1580         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1581             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1582             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1583             pa_tagstruct_get_timeval(t, &local) < 0 ||
1584             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1585             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1586             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1587
1588             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1589             goto finish;
1590         }
1591
1592         if (o->context->version >= 13 &&
1593             o->stream->direction == PA_STREAM_PLAYBACK)
1594             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1595                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1596
1597                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1598                 goto finish;
1599             }
1600
1601
1602         if (!pa_tagstruct_eof(t)) {
1603             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1604             goto finish;
1605         }
1606         o->stream->timing_info_valid = TRUE;
1607         i->write_index_corrupt = FALSE;
1608         i->read_index_corrupt = FALSE;
1609
1610         i->playing = (int) playing;
1611         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1612
1613         pa_gettimeofday(&now);
1614
1615         /* Calculcate timestamps */
1616         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1617             /* local and remote seem to have synchronized clocks */
1618
1619             if (o->stream->direction == PA_STREAM_PLAYBACK)
1620                 i->transport_usec = pa_timeval_diff(&remote, &local);
1621             else
1622                 i->transport_usec = pa_timeval_diff(&now, &remote);
1623
1624             i->synchronized_clocks = TRUE;
1625             i->timestamp = remote;
1626         } else {
1627             /* clocks are not synchronized, let's estimate latency then */
1628             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1629             i->synchronized_clocks = FALSE;
1630             i->timestamp = local;
1631             pa_timeval_add(&i->timestamp, i->transport_usec);
1632         }
1633
1634         /* Invalidate read and write indexes if necessary */
1635         if (tag < o->stream->read_index_not_before)
1636             i->read_index_corrupt = TRUE;
1637
1638         if (tag < o->stream->write_index_not_before)
1639             i->write_index_corrupt = TRUE;
1640
1641         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1642             /* Write index correction */
1643
1644             int n, j;
1645             uint32_t ctag = tag;
1646
1647             /* Go through the saved correction values and add up the
1648              * total correction.*/
1649             for (n = 0, j = o->stream->current_write_index_correction+1;
1650                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1651                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1652
1653                 /* Step over invalid data or out-of-date data */
1654                 if (!o->stream->write_index_corrections[j].valid ||
1655                     o->stream->write_index_corrections[j].tag < ctag)
1656                     continue;
1657
1658                 /* Make sure that everything is in order */
1659                 ctag = o->stream->write_index_corrections[j].tag+1;
1660
1661                 /* Now fix the write index */
1662                 if (o->stream->write_index_corrections[j].corrupt) {
1663                     /* A corrupting seek was made */
1664                     i->write_index_corrupt = TRUE;
1665                 } else if (o->stream->write_index_corrections[j].absolute) {
1666                     /* An absolute seek was made */
1667                     i->write_index = o->stream->write_index_corrections[j].value;
1668                     i->write_index_corrupt = FALSE;
1669                 } else if (!i->write_index_corrupt) {
1670                     /* A relative seek was made */
1671                     i->write_index += o->stream->write_index_corrections[j].value;
1672                 }
1673             }
1674
1675             /* Clear old correction entries */
1676             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1677                 if (!o->stream->write_index_corrections[n].valid)
1678                     continue;
1679
1680                 if (o->stream->write_index_corrections[n].tag <= tag)
1681                     o->stream->write_index_corrections[n].valid = FALSE;
1682             }
1683         }
1684
1685         if (o->stream->direction == PA_STREAM_RECORD) {
1686             /* Read index correction */
1687
1688             if (!i->read_index_corrupt)
1689                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1690         }
1691
1692         /* Update smoother */
1693         if (o->stream->smoother) {
1694             pa_usec_t u, x;
1695
1696             u = x = pa_rtclock_now() - i->transport_usec;
1697
1698             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1699                 pa_usec_t su;
1700
1701                 /* If we weren't playing then it will take some time
1702                  * until the audio will actually come out through the
1703                  * speakers. Since we follow that timing here, we need
1704                  * to try to fix this up */
1705
1706                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1707
1708                 if (su < i->sink_usec)
1709                     x += i->sink_usec - su;
1710             }
1711
1712             if (!i->playing)
1713                 pa_smoother_pause(o->stream->smoother, x);
1714
1715             /* Update the smoother */
1716             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1717                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1718                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1719
1720             if (i->playing)
1721                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1722         }
1723     }
1724
1725     o->stream->auto_timing_update_requested = FALSE;
1726
1727     if (o->stream->latency_update_callback)
1728         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1729
1730     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1731         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1732         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1733     }
1734
1735 finish:
1736
1737     pa_operation_done(o);
1738     pa_operation_unref(o);
1739 }
1740
1741 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1742     uint32_t tag;
1743     pa_operation *o;
1744     pa_tagstruct *t;
1745     struct timeval now;
1746     int cidx = 0;
1747
1748     pa_assert(s);
1749     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1750
1751     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1752     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1753     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1754
1755     if (s->direction == PA_STREAM_PLAYBACK) {
1756         /* Find a place to store the write_index correction data for this entry */
1757         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1758
1759         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1760         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1761     }
1762     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1763
1764     t = pa_tagstruct_command(
1765             s->context,
1766             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1767             &tag);
1768     pa_tagstruct_putu32(t, s->channel);
1769     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1770
1771     pa_pstream_send_tagstruct(s->context->pstream, t);
1772     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1773
1774     if (s->direction == PA_STREAM_PLAYBACK) {
1775         /* Fill in initial correction data */
1776
1777         s->current_write_index_correction = cidx;
1778
1779         s->write_index_corrections[cidx].valid = TRUE;
1780         s->write_index_corrections[cidx].absolute = FALSE;
1781         s->write_index_corrections[cidx].corrupt = FALSE;
1782         s->write_index_corrections[cidx].tag = tag;
1783         s->write_index_corrections[cidx].value = 0;
1784     }
1785
1786     return o;
1787 }
1788
1789 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1790     pa_stream *s = userdata;
1791
1792     pa_assert(pd);
1793     pa_assert(s);
1794     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1795
1796     pa_stream_ref(s);
1797
1798     if (command != PA_COMMAND_REPLY) {
1799         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1800             goto finish;
1801
1802         pa_stream_set_state(s, PA_STREAM_FAILED);
1803         goto finish;
1804     } else if (!pa_tagstruct_eof(t)) {
1805         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1806         goto finish;
1807     }
1808
1809     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1810
1811 finish:
1812     pa_stream_unref(s);
1813 }
1814
1815 int pa_stream_disconnect(pa_stream *s) {
1816     pa_tagstruct *t;
1817     uint32_t tag;
1818
1819     pa_assert(s);
1820     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1821
1822     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1823     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1824     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1825
1826     pa_stream_ref(s);
1827
1828     t = pa_tagstruct_command(
1829             s->context,
1830             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1831                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1832             &tag);
1833     pa_tagstruct_putu32(t, s->channel);
1834     pa_pstream_send_tagstruct(s->context->pstream, t);
1835     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1836
1837     pa_stream_unref(s);
1838     return 0;
1839 }
1840
1841 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1842     pa_assert(s);
1843     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1844
1845     if (pa_detect_fork())
1846         return;
1847
1848     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1849         return;
1850
1851     s->read_callback = cb;
1852     s->read_userdata = userdata;
1853 }
1854
1855 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1856     pa_assert(s);
1857     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1858
1859     if (pa_detect_fork())
1860         return;
1861
1862     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1863         return;
1864
1865     s->write_callback = cb;
1866     s->write_userdata = userdata;
1867 }
1868
1869 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1870     pa_assert(s);
1871     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1872
1873     if (pa_detect_fork())
1874         return;
1875
1876     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1877         return;
1878
1879     s->state_callback = cb;
1880     s->state_userdata = userdata;
1881 }
1882
1883 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1884     pa_assert(s);
1885     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1886
1887     if (pa_detect_fork())
1888         return;
1889
1890     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1891         return;
1892
1893     s->overflow_callback = cb;
1894     s->overflow_userdata = userdata;
1895 }
1896
1897 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1898     pa_assert(s);
1899     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1900
1901     if (pa_detect_fork())
1902         return;
1903
1904     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1905         return;
1906
1907     s->underflow_callback = cb;
1908     s->underflow_userdata = userdata;
1909 }
1910
1911 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1912     pa_assert(s);
1913     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1914
1915     if (pa_detect_fork())
1916         return;
1917
1918     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1919         return;
1920
1921     s->latency_update_callback = cb;
1922     s->latency_update_userdata = userdata;
1923 }
1924
1925 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1926     pa_assert(s);
1927     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1928
1929     if (pa_detect_fork())
1930         return;
1931
1932     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1933         return;
1934
1935     s->moved_callback = cb;
1936     s->moved_userdata = userdata;
1937 }
1938
1939 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1940     pa_assert(s);
1941     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1942
1943     if (pa_detect_fork())
1944         return;
1945
1946     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1947         return;
1948
1949     s->suspended_callback = cb;
1950     s->suspended_userdata = userdata;
1951 }
1952
1953 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1954     pa_assert(s);
1955     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1956
1957     if (pa_detect_fork())
1958         return;
1959
1960     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1961         return;
1962
1963     s->started_callback = cb;
1964     s->started_userdata = userdata;
1965 }
1966
1967 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1968     pa_assert(s);
1969     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1970
1971     if (pa_detect_fork())
1972         return;
1973
1974     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1975         return;
1976
1977     s->event_callback = cb;
1978     s->event_userdata = userdata;
1979 }
1980
1981 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1982     pa_assert(s);
1983     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1984
1985     if (pa_detect_fork())
1986         return;
1987
1988     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1989         return;
1990
1991     s->buffer_attr_callback = cb;
1992     s->buffer_attr_userdata = userdata;
1993 }
1994
1995 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1996     pa_operation *o = userdata;
1997     int success = 1;
1998
1999     pa_assert(pd);
2000     pa_assert(o);
2001     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2002
2003     if (!o->context)
2004         goto finish;
2005
2006     if (command != PA_COMMAND_REPLY) {
2007         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2008             goto finish;
2009
2010         success = 0;
2011     } else if (!pa_tagstruct_eof(t)) {
2012         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2013         goto finish;
2014     }
2015
2016     if (o->callback) {
2017         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2018         cb(o->stream, success, o->userdata);
2019     }
2020
2021 finish:
2022     pa_operation_done(o);
2023     pa_operation_unref(o);
2024 }
2025
2026 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2027     pa_operation *o;
2028     pa_tagstruct *t;
2029     uint32_t tag;
2030
2031     pa_assert(s);
2032     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2033
2034     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2035     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2036     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2037
2038     s->corked = b;
2039
2040     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2041
2042     t = pa_tagstruct_command(
2043             s->context,
2044             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2045             &tag);
2046     pa_tagstruct_putu32(t, s->channel);
2047     pa_tagstruct_put_boolean(t, !!b);
2048     pa_pstream_send_tagstruct(s->context->pstream, t);
2049     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2050
2051     check_smoother_status(s, FALSE, FALSE, FALSE);
2052
2053     /* This might cause the indexes to hang/start again, hence
2054      * let's request a timing update */
2055     request_auto_timing_update(s, TRUE);
2056
2057     return o;
2058 }
2059
2060 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2061     pa_tagstruct *t;
2062     pa_operation *o;
2063     uint32_t tag;
2064
2065     pa_assert(s);
2066     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2067
2068     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2069     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2070
2071     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2072
2073     t = pa_tagstruct_command(s->context, command, &tag);
2074     pa_tagstruct_putu32(t, s->channel);
2075     pa_pstream_send_tagstruct(s->context->pstream, t);
2076     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2077
2078     return o;
2079 }
2080
2081 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2082     pa_operation *o;
2083
2084     pa_assert(s);
2085     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2086
2087     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2088     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2089     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2090
2091     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2092         return NULL;
2093
2094     if (s->direction == PA_STREAM_PLAYBACK) {
2095
2096         if (s->write_index_corrections[s->current_write_index_correction].valid)
2097             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2098
2099         if (s->buffer_attr.prebuf > 0)
2100             check_smoother_status(s, FALSE, FALSE, TRUE);
2101
2102         /* This will change the write index, but leave the
2103          * read index untouched. */
2104         invalidate_indexes(s, FALSE, TRUE);
2105
2106     } else
2107         /* For record streams this has no influence on the write
2108          * index, but the read index might jump. */
2109         invalidate_indexes(s, TRUE, FALSE);
2110
2111     return o;
2112 }
2113
2114 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2115     pa_operation *o;
2116
2117     pa_assert(s);
2118     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2119
2120     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2121     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2122     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2123     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2124
2125     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2126         return NULL;
2127
2128     /* This might cause the read index to hang again, hence
2129      * let's request a timing update */
2130     request_auto_timing_update(s, TRUE);
2131
2132     return o;
2133 }
2134
2135 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2136     pa_operation *o;
2137
2138     pa_assert(s);
2139     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140
2141     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2142     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2143     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2144     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2145
2146     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2147         return NULL;
2148
2149     /* This might cause the read index to start moving again, hence
2150      * let's request a timing update */
2151     request_auto_timing_update(s, TRUE);
2152
2153     return o;
2154 }
2155
2156 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2157     pa_operation *o;
2158
2159     pa_assert(s);
2160     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2161     pa_assert(name);
2162
2163     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2164     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2165     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2166
2167     if (s->context->version >= 13) {
2168         pa_proplist *p = pa_proplist_new();
2169
2170         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2171         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2172         pa_proplist_free(p);
2173     } else {
2174         pa_tagstruct *t;
2175         uint32_t tag;
2176
2177         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2178         t = pa_tagstruct_command(
2179                 s->context,
2180                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2181                 &tag);
2182         pa_tagstruct_putu32(t, s->channel);
2183         pa_tagstruct_puts(t, name);
2184         pa_pstream_send_tagstruct(s->context->pstream, t);
2185         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2186     }
2187
2188     return o;
2189 }
2190
2191 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2192     pa_usec_t usec;
2193
2194     pa_assert(s);
2195     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2196
2197     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2198     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2199     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2200     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2201     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2202     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2203
2204     if (s->smoother)
2205         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2206     else
2207         usec = calc_time(s, FALSE);
2208
2209     /* Make sure the time runs monotonically */
2210     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2211         if (usec < s->previous_time)
2212             usec = s->previous_time;
2213         else
2214             s->previous_time = usec;
2215     }
2216
2217     if (r_usec)
2218         *r_usec = usec;
2219
2220     return 0;
2221 }
2222
2223 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2224     pa_assert(s);
2225     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2226
2227     if (negative)
2228         *negative = 0;
2229
2230     if (a >= b)
2231         return a-b;
2232     else {
2233         if (negative && s->direction == PA_STREAM_RECORD) {
2234             *negative = 1;
2235             return b-a;
2236         } else
2237             return 0;
2238     }
2239 }
2240
2241 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2242     pa_usec_t t, c;
2243     int r;
2244     int64_t cindex;
2245
2246     pa_assert(s);
2247     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2248     pa_assert(r_usec);
2249
2250     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2251     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2252     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2253     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2254     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2255     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2256
2257     if ((r = pa_stream_get_time(s, &t)) < 0)
2258         return r;
2259
2260     if (s->direction == PA_STREAM_PLAYBACK)
2261         cindex = s->timing_info.write_index;
2262     else
2263         cindex = s->timing_info.read_index;
2264
2265     if (cindex < 0)
2266         cindex = 0;
2267
2268     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2269
2270     if (s->direction == PA_STREAM_PLAYBACK)
2271         *r_usec = time_counter_diff(s, c, t, negative);
2272     else
2273         *r_usec = time_counter_diff(s, t, c, negative);
2274
2275     return 0;
2276 }
2277
2278 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2279     pa_assert(s);
2280     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2281
2282     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2283     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2284     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2285     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2286
2287     return &s->timing_info;
2288 }
2289
2290 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2291     pa_assert(s);
2292     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2293
2294     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2295
2296     return &s->sample_spec;
2297 }
2298
2299 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2300     pa_assert(s);
2301     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2302
2303     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2304
2305     return &s->channel_map;
2306 }
2307
2308 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2309     pa_assert(s);
2310     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2311
2312     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2313     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2314     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2315
2316     return &s->buffer_attr;
2317 }
2318
2319 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2320     pa_operation *o = userdata;
2321     int success = 1;
2322
2323     pa_assert(pd);
2324     pa_assert(o);
2325     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2326
2327     if (!o->context)
2328         goto finish;
2329
2330     if (command != PA_COMMAND_REPLY) {
2331         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2332             goto finish;
2333
2334         success = 0;
2335     } else {
2336         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2337             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2338                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2339                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2340                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2341                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2342                 goto finish;
2343             }
2344         } else if (o->stream->direction == PA_STREAM_RECORD) {
2345             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2346                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2347                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2348                 goto finish;
2349             }
2350         }
2351
2352         if (o->stream->context->version >= 13) {
2353             pa_usec_t usec;
2354
2355             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2356                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2357                 goto finish;
2358             }
2359
2360             if (o->stream->direction == PA_STREAM_RECORD)
2361                 o->stream->timing_info.configured_source_usec = usec;
2362             else
2363                 o->stream->timing_info.configured_sink_usec = usec;
2364         }
2365
2366         if (!pa_tagstruct_eof(t)) {
2367             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2368             goto finish;
2369         }
2370     }
2371
2372     if (o->callback) {
2373         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2374         cb(o->stream, success, o->userdata);
2375     }
2376
2377 finish:
2378     pa_operation_done(o);
2379     pa_operation_unref(o);
2380 }
2381
2382
2383 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2384     pa_operation *o;
2385     pa_tagstruct *t;
2386     uint32_t tag;
2387
2388     pa_assert(s);
2389     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2390     pa_assert(attr);
2391
2392     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2393     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2394     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2395     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2396
2397     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2398
2399     t = pa_tagstruct_command(
2400             s->context,
2401             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2402             &tag);
2403     pa_tagstruct_putu32(t, s->channel);
2404
2405     pa_tagstruct_putu32(t, attr->maxlength);
2406
2407     if (s->direction == PA_STREAM_PLAYBACK)
2408         pa_tagstruct_put(
2409                 t,
2410                 PA_TAG_U32, attr->tlength,
2411                 PA_TAG_U32, attr->prebuf,
2412                 PA_TAG_U32, attr->minreq,
2413                 PA_TAG_INVALID);
2414     else
2415         pa_tagstruct_putu32(t, attr->fragsize);
2416
2417     if (s->context->version >= 13)
2418         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2419
2420     if (s->context->version >= 14)
2421         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2422
2423     pa_pstream_send_tagstruct(s->context->pstream, t);
2424     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2425
2426     /* This might cause changes in the read/write indexex, hence let's
2427      * request a timing update */
2428     request_auto_timing_update(s, TRUE);
2429
2430     return o;
2431 }
2432
2433 uint32_t pa_stream_get_device_index(pa_stream *s) {
2434     pa_assert(s);
2435     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2436
2437     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2438     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2439     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2440     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2441     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2442
2443     return s->device_index;
2444 }
2445
2446 const char *pa_stream_get_device_name(pa_stream *s) {
2447     pa_assert(s);
2448     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2449
2450     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2451     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2452     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2453     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2454     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2455
2456     return s->device_name;
2457 }
2458
2459 int pa_stream_is_suspended(pa_stream *s) {
2460     pa_assert(s);
2461     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2462
2463     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2464     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2465     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2466     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2467
2468     return s->suspended;
2469 }
2470
2471 int pa_stream_is_corked(pa_stream *s) {
2472     pa_assert(s);
2473     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2474
2475     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2476     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2477     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2478
2479     return s->corked;
2480 }
2481
2482 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2483     pa_operation *o = userdata;
2484     int success = 1;
2485
2486     pa_assert(pd);
2487     pa_assert(o);
2488     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2489
2490     if (!o->context)
2491         goto finish;
2492
2493     if (command != PA_COMMAND_REPLY) {
2494         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2495             goto finish;
2496
2497         success = 0;
2498     } else {
2499
2500         if (!pa_tagstruct_eof(t)) {
2501             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2502             goto finish;
2503         }
2504     }
2505
2506     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2507     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2508
2509     if (o->callback) {
2510         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2511         cb(o->stream, success, o->userdata);
2512     }
2513
2514 finish:
2515     pa_operation_done(o);
2516     pa_operation_unref(o);
2517 }
2518
2519
2520 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2521     pa_operation *o;
2522     pa_tagstruct *t;
2523     uint32_t tag;
2524
2525     pa_assert(s);
2526     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2527
2528     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2529     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2530     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2531     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2532     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2533     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2534
2535     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2536     o->private = PA_UINT_TO_PTR(rate);
2537
2538     t = pa_tagstruct_command(
2539             s->context,
2540             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2541             &tag);
2542     pa_tagstruct_putu32(t, s->channel);
2543     pa_tagstruct_putu32(t, rate);
2544
2545     pa_pstream_send_tagstruct(s->context->pstream, t);
2546     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2547
2548     return o;
2549 }
2550
2551 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2552     pa_operation *o;
2553     pa_tagstruct *t;
2554     uint32_t tag;
2555
2556     pa_assert(s);
2557     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2558
2559     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2560     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2561     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2562     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2563     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2564
2565     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2566
2567     t = pa_tagstruct_command(
2568             s->context,
2569             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2570             &tag);
2571     pa_tagstruct_putu32(t, s->channel);
2572     pa_tagstruct_putu32(t, (uint32_t) mode);
2573     pa_tagstruct_put_proplist(t, p);
2574
2575     pa_pstream_send_tagstruct(s->context->pstream, t);
2576     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2577
2578     /* Please note that we don't update s->proplist here, because we
2579      * don't export that field */
2580
2581     return o;
2582 }
2583
2584 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2585     pa_operation *o;
2586     pa_tagstruct *t;
2587     uint32_t tag;
2588     const char * const*k;
2589
2590     pa_assert(s);
2591     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2592
2593     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2594     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2595     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2596     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2597     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2598
2599     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2600
2601     t = pa_tagstruct_command(
2602             s->context,
2603             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2604             &tag);
2605     pa_tagstruct_putu32(t, s->channel);
2606
2607     for (k = keys; *k; k++)
2608         pa_tagstruct_puts(t, *k);
2609
2610     pa_tagstruct_puts(t, NULL);
2611
2612     pa_pstream_send_tagstruct(s->context->pstream, t);
2613     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2614
2615     /* Please note that we don't update s->proplist here, because we
2616      * don't export that field */
2617
2618     return o;
2619 }
2620
2621 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2622     pa_assert(s);
2623     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2624
2625     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2626     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2627     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2628     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2629
2630     s->direct_on_input = sink_input_idx;
2631
2632     return 0;
2633 }
2634
2635 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2636     pa_assert(s);
2637     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2638
2639     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2640     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2641     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2642
2643     return s->direct_on_input;
2644 }