client: detect forking in sample cache API, too
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/stream.h>
33 #include <pulse/timeval.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42
43 #include "fork-detect.h"
44 #include "internal.h"
45
46 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
47 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
48
49 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_MIN_HISTORY (4)
52
53 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
54     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
55 }
56
57 static void reset_callbacks(pa_stream *s) {
58     s->read_callback = NULL;
59     s->read_userdata = NULL;
60     s->write_callback = NULL;
61     s->write_userdata = NULL;
62     s->state_callback = NULL;
63     s->state_userdata = NULL;
64     s->overflow_callback = NULL;
65     s->overflow_userdata = NULL;
66     s->underflow_callback = NULL;
67     s->underflow_userdata = NULL;
68     s->latency_update_callback = NULL;
69     s->latency_update_userdata = NULL;
70     s->moved_callback = NULL;
71     s->moved_userdata = NULL;
72     s->suspended_callback = NULL;
73     s->suspended_userdata = NULL;
74     s->started_callback = NULL;
75     s->started_userdata = NULL;
76     s->event_callback = NULL;
77     s->event_userdata = NULL;
78     s->buffer_attr_callback = NULL;
79     s->buffer_attr_userdata = NULL;
80 }
81
82 pa_stream *pa_stream_new_with_proplist(
83         pa_context *c,
84         const char *name,
85         const pa_sample_spec *ss,
86         const pa_channel_map *map,
87         pa_proplist *p) {
88
89     pa_stream *s;
90     int i;
91     pa_channel_map tmap;
92
93     pa_assert(c);
94     pa_assert(PA_REFCNT_VALUE(c) >= 1);
95
96     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
102     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103
104     if (!map)
105         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
106
107     s = pa_xnew(pa_stream, 1);
108     PA_REFCNT_INIT(s);
109     s->context = c;
110     s->mainloop = c->mainloop;
111
112     s->direction = PA_STREAM_NODIRECTION;
113     s->state = PA_STREAM_UNCONNECTED;
114     s->flags = 0;
115
116     s->sample_spec = *ss;
117     s->channel_map = *map;
118
119     s->direct_on_input = PA_INVALID_INDEX;
120
121     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
122     if (name)
123         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
124
125     s->channel = 0;
126     s->channel_valid = FALSE;
127     s->syncid = c->csyncid++;
128     s->stream_index = PA_INVALID_INDEX;
129
130     s->requested_bytes = 0;
131     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
132
133     /* We initialize der target length here, so that if the user
134      * passes no explicit buffering metrics the default is similar to
135      * what older PA versions provided. */
136
137     s->buffer_attr.maxlength = (uint32_t) -1;
138     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
139     s->buffer_attr.minreq = (uint32_t) -1;
140     s->buffer_attr.prebuf = (uint32_t) -1;
141     s->buffer_attr.fragsize = (uint32_t) -1;
142
143     s->device_index = PA_INVALID_INDEX;
144     s->device_name = NULL;
145     s->suspended = FALSE;
146     s->corked = FALSE;
147
148     s->write_memblock = NULL;
149     s->write_data = NULL;
150
151     pa_memchunk_reset(&s->peek_memchunk);
152     s->peek_data = NULL;
153     s->record_memblockq = NULL;
154
155     memset(&s->timing_info, 0, sizeof(s->timing_info));
156     s->timing_info_valid = FALSE;
157
158     s->previous_time = 0;
159
160     s->read_index_not_before = 0;
161     s->write_index_not_before = 0;
162     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
163         s->write_index_corrections[i].valid = 0;
164     s->current_write_index_correction = 0;
165
166     s->auto_timing_update_event = NULL;
167     s->auto_timing_update_requested = FALSE;
168     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
169
170     reset_callbacks(s);
171
172     s->smoother = NULL;
173
174     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
175     PA_LLIST_PREPEND(pa_stream, c->streams, s);
176     pa_stream_ref(s);
177
178     return s;
179 }
180
181 static void stream_unlink(pa_stream *s) {
182     pa_operation *o, *n;
183     pa_assert(s);
184
185     if (!s->context)
186         return;
187
188     /* Detach from context */
189
190     /* Unref all operatio object that point to us */
191     for (o = s->context->operations; o; o = n) {
192         n = o->next;
193
194         if (o->stream == s)
195             pa_operation_cancel(o);
196     }
197
198     /* Drop all outstanding replies for this stream */
199     if (s->context->pdispatch)
200         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
201
202     if (s->channel_valid) {
203         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
204         s->channel = 0;
205         s->channel_valid = FALSE;
206     }
207
208     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
209     pa_stream_unref(s);
210
211     s->context = NULL;
212
213     if (s->auto_timing_update_event) {
214         pa_assert(s->mainloop);
215         s->mainloop->time_free(s->auto_timing_update_event);
216     }
217
218     reset_callbacks(s);
219 }
220
221 static void stream_free(pa_stream *s) {
222     pa_assert(s);
223
224     stream_unlink(s);
225
226     if (s->write_memblock) {
227         pa_memblock_release(s->write_memblock);
228         pa_memblock_unref(s->write_data);
229     }
230
231     if (s->peek_memchunk.memblock) {
232         if (s->peek_data)
233             pa_memblock_release(s->peek_memchunk.memblock);
234         pa_memblock_unref(s->peek_memchunk.memblock);
235     }
236
237     if (s->record_memblockq)
238         pa_memblockq_free(s->record_memblockq);
239
240     if (s->proplist)
241         pa_proplist_free(s->proplist);
242
243     if (s->smoother)
244         pa_smoother_free(s->smoother);
245
246     pa_xfree(s->device_name);
247     pa_xfree(s);
248 }
249
250 void pa_stream_unref(pa_stream *s) {
251     pa_assert(s);
252     pa_assert(PA_REFCNT_VALUE(s) >= 1);
253
254     if (PA_REFCNT_DEC(s) <= 0)
255         stream_free(s);
256 }
257
258 pa_stream* pa_stream_ref(pa_stream *s) {
259     pa_assert(s);
260     pa_assert(PA_REFCNT_VALUE(s) >= 1);
261
262     PA_REFCNT_INC(s);
263     return s;
264 }
265
266 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
267     pa_assert(s);
268     pa_assert(PA_REFCNT_VALUE(s) >= 1);
269
270     return s->state;
271 }
272
273 pa_context* pa_stream_get_context(pa_stream *s) {
274     pa_assert(s);
275     pa_assert(PA_REFCNT_VALUE(s) >= 1);
276
277     return s->context;
278 }
279
280 uint32_t pa_stream_get_index(pa_stream *s) {
281     pa_assert(s);
282     pa_assert(PA_REFCNT_VALUE(s) >= 1);
283
284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
285     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
286
287     return s->stream_index;
288 }
289
290 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
291     pa_assert(s);
292     pa_assert(PA_REFCNT_VALUE(s) >= 1);
293
294     if (s->state == st)
295         return;
296
297     pa_stream_ref(s);
298
299     s->state = st;
300
301     if (s->state_callback)
302         s->state_callback(s, s->state_userdata);
303
304     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
305         stream_unlink(s);
306
307     pa_stream_unref(s);
308 }
309
310 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
311     pa_assert(s);
312     pa_assert(PA_REFCNT_VALUE(s) >= 1);
313
314     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
315         return;
316
317     if (s->state == PA_STREAM_READY &&
318         (force || !s->auto_timing_update_requested)) {
319         pa_operation *o;
320
321 /*         pa_log("Automatically requesting new timing data"); */
322
323         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
324             pa_operation_unref(o);
325             s->auto_timing_update_requested = TRUE;
326         }
327     }
328
329     if (s->auto_timing_update_event) {
330         if (force)
331             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
332
333         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
334
335         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
336     }
337 }
338
339 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
340     pa_context *c = userdata;
341     pa_stream *s;
342     uint32_t channel;
343
344     pa_assert(pd);
345     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
346     pa_assert(t);
347     pa_assert(c);
348     pa_assert(PA_REFCNT_VALUE(c) >= 1);
349
350     pa_context_ref(c);
351
352     if (pa_tagstruct_getu32(t, &channel) < 0 ||
353         !pa_tagstruct_eof(t)) {
354         pa_context_fail(c, PA_ERR_PROTOCOL);
355         goto finish;
356     }
357
358     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
359         goto finish;
360
361     if (s->state != PA_STREAM_READY)
362         goto finish;
363
364     pa_context_set_error(c, PA_ERR_KILLED);
365     pa_stream_set_state(s, PA_STREAM_FAILED);
366
367 finish:
368     pa_context_unref(c);
369 }
370
371 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
372     pa_usec_t x;
373
374     pa_assert(s);
375     pa_assert(!force_start || !force_stop);
376
377     if (!s->smoother)
378         return;
379
380     x = pa_rtclock_now();
381
382     if (s->timing_info_valid) {
383         if (aposteriori)
384             x -= s->timing_info.transport_usec;
385         else
386             x += s->timing_info.transport_usec;
387     }
388
389     if (s->suspended || s->corked || force_stop)
390         pa_smoother_pause(s->smoother, x);
391     else if (force_start || s->buffer_attr.prebuf == 0) {
392
393         if (!s->timing_info_valid &&
394             !aposteriori &&
395             !force_start &&
396             !force_stop &&
397             s->context->version >= 13) {
398
399             /* If the server supports STARTED events we take them as
400              * indications when audio really starts/stops playing, if
401              * we don't have any timing info yet -- instead of trying
402              * to be smart and guessing the server time. Otherwise the
403              * unknown transport delay add too much noise to our time
404              * calculations. */
405
406             return;
407         }
408
409         pa_smoother_resume(s->smoother, x, TRUE);
410     }
411
412     /* Please note that we have no idea if playback actually started
413      * if prebuf is non-zero! */
414 }
415
416 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
417     pa_context *c = userdata;
418     pa_stream *s;
419     uint32_t channel;
420     const char *dn;
421     pa_bool_t suspended;
422     uint32_t di;
423     pa_usec_t usec = 0;
424     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
425
426     pa_assert(pd);
427     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
428     pa_assert(t);
429     pa_assert(c);
430     pa_assert(PA_REFCNT_VALUE(c) >= 1);
431
432     pa_context_ref(c);
433
434     if (c->version < 12) {
435         pa_context_fail(c, PA_ERR_PROTOCOL);
436         goto finish;
437     }
438
439     if (pa_tagstruct_getu32(t, &channel) < 0 ||
440         pa_tagstruct_getu32(t, &di) < 0 ||
441         pa_tagstruct_gets(t, &dn) < 0 ||
442         pa_tagstruct_get_boolean(t, &suspended) < 0) {
443         pa_context_fail(c, PA_ERR_PROTOCOL);
444         goto finish;
445     }
446
447     if (c->version >= 13) {
448
449         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
450             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
451                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
452                 pa_tagstruct_get_usec(t, &usec) < 0) {
453                 pa_context_fail(c, PA_ERR_PROTOCOL);
454                 goto finish;
455             }
456         } else {
457             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
458                 pa_tagstruct_getu32(t, &tlength) < 0 ||
459                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
460                 pa_tagstruct_getu32(t, &minreq) < 0 ||
461                 pa_tagstruct_get_usec(t, &usec) < 0) {
462                 pa_context_fail(c, PA_ERR_PROTOCOL);
463                 goto finish;
464             }
465         }
466     }
467
468     if (!pa_tagstruct_eof(t)) {
469         pa_context_fail(c, PA_ERR_PROTOCOL);
470         goto finish;
471     }
472
473     if (!dn || di == PA_INVALID_INDEX) {
474         pa_context_fail(c, PA_ERR_PROTOCOL);
475         goto finish;
476     }
477
478     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
479         goto finish;
480
481     if (s->state != PA_STREAM_READY)
482         goto finish;
483
484     if (c->version >= 13) {
485         if (s->direction == PA_STREAM_RECORD)
486             s->timing_info.configured_source_usec = usec;
487         else
488             s->timing_info.configured_sink_usec = usec;
489
490         s->buffer_attr.maxlength = maxlength;
491         s->buffer_attr.fragsize = fragsize;
492         s->buffer_attr.tlength = tlength;
493         s->buffer_attr.prebuf = prebuf;
494         s->buffer_attr.minreq = minreq;
495     }
496
497     pa_xfree(s->device_name);
498     s->device_name = pa_xstrdup(dn);
499     s->device_index = di;
500
501     s->suspended = suspended;
502
503     check_smoother_status(s, TRUE, FALSE, FALSE);
504     request_auto_timing_update(s, TRUE);
505
506     if (s->moved_callback)
507         s->moved_callback(s, s->moved_userdata);
508
509 finish:
510     pa_context_unref(c);
511 }
512
513 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
514     pa_context *c = userdata;
515     pa_stream *s;
516     uint32_t channel;
517     pa_usec_t usec = 0;
518     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
519
520     pa_assert(pd);
521     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
522     pa_assert(t);
523     pa_assert(c);
524     pa_assert(PA_REFCNT_VALUE(c) >= 1);
525
526     pa_context_ref(c);
527
528     if (c->version < 15) {
529         pa_context_fail(c, PA_ERR_PROTOCOL);
530         goto finish;
531     }
532
533     if (pa_tagstruct_getu32(t, &channel) < 0) {
534         pa_context_fail(c, PA_ERR_PROTOCOL);
535         goto finish;
536     }
537
538     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
539         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
540             pa_tagstruct_getu32(t, &fragsize) < 0 ||
541             pa_tagstruct_get_usec(t, &usec) < 0) {
542             pa_context_fail(c, PA_ERR_PROTOCOL);
543             goto finish;
544         }
545     } else {
546         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
547             pa_tagstruct_getu32(t, &tlength) < 0 ||
548             pa_tagstruct_getu32(t, &prebuf) < 0 ||
549             pa_tagstruct_getu32(t, &minreq) < 0 ||
550             pa_tagstruct_get_usec(t, &usec) < 0) {
551             pa_context_fail(c, PA_ERR_PROTOCOL);
552             goto finish;
553         }
554     }
555
556     if (!pa_tagstruct_eof(t)) {
557         pa_context_fail(c, PA_ERR_PROTOCOL);
558         goto finish;
559     }
560
561     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
562         goto finish;
563
564     if (s->state != PA_STREAM_READY)
565         goto finish;
566
567     if (s->direction == PA_STREAM_RECORD)
568         s->timing_info.configured_source_usec = usec;
569     else
570         s->timing_info.configured_sink_usec = usec;
571
572     s->buffer_attr.maxlength = maxlength;
573     s->buffer_attr.fragsize = fragsize;
574     s->buffer_attr.tlength = tlength;
575     s->buffer_attr.prebuf = prebuf;
576     s->buffer_attr.minreq = minreq;
577
578     request_auto_timing_update(s, TRUE);
579
580     if (s->buffer_attr_callback)
581         s->buffer_attr_callback(s, s->buffer_attr_userdata);
582
583 finish:
584     pa_context_unref(c);
585 }
586
587 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
588     pa_context *c = userdata;
589     pa_stream *s;
590     uint32_t channel;
591     pa_bool_t suspended;
592
593     pa_assert(pd);
594     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
595     pa_assert(t);
596     pa_assert(c);
597     pa_assert(PA_REFCNT_VALUE(c) >= 1);
598
599     pa_context_ref(c);
600
601     if (c->version < 12) {
602         pa_context_fail(c, PA_ERR_PROTOCOL);
603         goto finish;
604     }
605
606     if (pa_tagstruct_getu32(t, &channel) < 0 ||
607         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
608         !pa_tagstruct_eof(t)) {
609         pa_context_fail(c, PA_ERR_PROTOCOL);
610         goto finish;
611     }
612
613     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
614         goto finish;
615
616     if (s->state != PA_STREAM_READY)
617         goto finish;
618
619     s->suspended = suspended;
620
621     check_smoother_status(s, TRUE, FALSE, FALSE);
622     request_auto_timing_update(s, TRUE);
623
624     if (s->suspended_callback)
625         s->suspended_callback(s, s->suspended_userdata);
626
627 finish:
628     pa_context_unref(c);
629 }
630
631 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
632     pa_context *c = userdata;
633     pa_stream *s;
634     uint32_t channel;
635
636     pa_assert(pd);
637     pa_assert(command == PA_COMMAND_STARTED);
638     pa_assert(t);
639     pa_assert(c);
640     pa_assert(PA_REFCNT_VALUE(c) >= 1);
641
642     pa_context_ref(c);
643
644     if (c->version < 13) {
645         pa_context_fail(c, PA_ERR_PROTOCOL);
646         goto finish;
647     }
648
649     if (pa_tagstruct_getu32(t, &channel) < 0 ||
650         !pa_tagstruct_eof(t)) {
651         pa_context_fail(c, PA_ERR_PROTOCOL);
652         goto finish;
653     }
654
655     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
656         goto finish;
657
658     if (s->state != PA_STREAM_READY)
659         goto finish;
660
661     check_smoother_status(s, TRUE, TRUE, FALSE);
662     request_auto_timing_update(s, TRUE);
663
664     if (s->started_callback)
665         s->started_callback(s, s->started_userdata);
666
667 finish:
668     pa_context_unref(c);
669 }
670
671 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
672     pa_context *c = userdata;
673     pa_stream *s;
674     uint32_t channel;
675     pa_proplist *pl = NULL;
676     const char *event;
677
678     pa_assert(pd);
679     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
680     pa_assert(t);
681     pa_assert(c);
682     pa_assert(PA_REFCNT_VALUE(c) >= 1);
683
684     pa_context_ref(c);
685
686     if (c->version < 15) {
687         pa_context_fail(c, PA_ERR_PROTOCOL);
688         goto finish;
689     }
690
691     pl = pa_proplist_new();
692
693     if (pa_tagstruct_getu32(t, &channel) < 0 ||
694         pa_tagstruct_gets(t, &event) < 0 ||
695         pa_tagstruct_get_proplist(t, pl) < 0 ||
696         !pa_tagstruct_eof(t) || !event) {
697         pa_context_fail(c, PA_ERR_PROTOCOL);
698         goto finish;
699     }
700
701     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
702         goto finish;
703
704     if (s->state != PA_STREAM_READY)
705         goto finish;
706
707     if (s->event_callback)
708         s->event_callback(s, event, pl, s->event_userdata);
709
710 finish:
711     pa_context_unref(c);
712
713     if (pl)
714         pa_proplist_free(pl);
715 }
716
717 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
718     pa_stream *s;
719     pa_context *c = userdata;
720     uint32_t bytes, channel;
721
722     pa_assert(pd);
723     pa_assert(command == PA_COMMAND_REQUEST);
724     pa_assert(t);
725     pa_assert(c);
726     pa_assert(PA_REFCNT_VALUE(c) >= 1);
727
728     pa_context_ref(c);
729
730     if (pa_tagstruct_getu32(t, &channel) < 0 ||
731         pa_tagstruct_getu32(t, &bytes) < 0 ||
732         !pa_tagstruct_eof(t)) {
733         pa_context_fail(c, PA_ERR_PROTOCOL);
734         goto finish;
735     }
736
737     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
738         goto finish;
739
740     if (s->state != PA_STREAM_READY)
741         goto finish;
742
743     s->requested_bytes += bytes;
744
745     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
746
747     if (s->requested_bytes > 0 && s->write_callback)
748         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
749
750 finish:
751     pa_context_unref(c);
752 }
753
754 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
755     pa_stream *s;
756     pa_context *c = userdata;
757     uint32_t channel;
758
759     pa_assert(pd);
760     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
761     pa_assert(t);
762     pa_assert(c);
763     pa_assert(PA_REFCNT_VALUE(c) >= 1);
764
765     pa_context_ref(c);
766
767     if (pa_tagstruct_getu32(t, &channel) < 0 ||
768         !pa_tagstruct_eof(t)) {
769         pa_context_fail(c, PA_ERR_PROTOCOL);
770         goto finish;
771     }
772
773     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
774         goto finish;
775
776     if (s->state != PA_STREAM_READY)
777         goto finish;
778
779     if (s->buffer_attr.prebuf > 0)
780         check_smoother_status(s, TRUE, FALSE, TRUE);
781
782     request_auto_timing_update(s, TRUE);
783
784     if (command == PA_COMMAND_OVERFLOW) {
785         if (s->overflow_callback)
786             s->overflow_callback(s, s->overflow_userdata);
787     } else if (command == PA_COMMAND_UNDERFLOW) {
788         if (s->underflow_callback)
789             s->underflow_callback(s, s->underflow_userdata);
790     }
791
792  finish:
793     pa_context_unref(c);
794 }
795
796 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
797     pa_assert(s);
798     pa_assert(PA_REFCNT_VALUE(s) >= 1);
799
800 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
801
802     if (s->state != PA_STREAM_READY)
803         return;
804
805     if (w) {
806         s->write_index_not_before = s->context->ctag;
807
808         if (s->timing_info_valid)
809             s->timing_info.write_index_corrupt = TRUE;
810
811 /*         pa_log("write_index invalidated"); */
812     }
813
814     if (r) {
815         s->read_index_not_before = s->context->ctag;
816
817         if (s->timing_info_valid)
818             s->timing_info.read_index_corrupt = TRUE;
819
820 /*         pa_log("read_index invalidated"); */
821     }
822
823     request_auto_timing_update(s, TRUE);
824 }
825
826 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
827     pa_stream *s = userdata;
828
829     pa_assert(s);
830     pa_assert(PA_REFCNT_VALUE(s) >= 1);
831
832     pa_stream_ref(s);
833     request_auto_timing_update(s, FALSE);
834     pa_stream_unref(s);
835 }
836
837 static void create_stream_complete(pa_stream *s) {
838     pa_assert(s);
839     pa_assert(PA_REFCNT_VALUE(s) >= 1);
840     pa_assert(s->state == PA_STREAM_CREATING);
841
842     pa_stream_set_state(s, PA_STREAM_READY);
843
844     if (s->requested_bytes > 0 && s->write_callback)
845         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
846
847     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
848         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
849         pa_assert(!s->auto_timing_update_event);
850         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
851
852         request_auto_timing_update(s, TRUE);
853     }
854
855     check_smoother_status(s, TRUE, FALSE, FALSE);
856 }
857
858 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
859     pa_assert(s);
860     pa_assert(attr);
861     pa_assert(ss);
862
863     if (s->context->version >= 13)
864         return;
865
866     /* Version older than 0.9.10 didn't do server side buffer_attr
867      * selection, hence we have to fake it on the client side. */
868
869     /* We choose fairly conservative values here, to not confuse
870      * old clients with extremely large playback buffers */
871
872     if (attr->maxlength == (uint32_t) -1)
873         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
874
875     if (attr->tlength == (uint32_t) -1)
876         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
877
878     if (attr->minreq == (uint32_t) -1)
879         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
880
881     if (attr->prebuf == (uint32_t) -1)
882         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
883
884     if (attr->fragsize == (uint32_t) -1)
885         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
886 }
887
888 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
889     pa_stream *s = userdata;
890     uint32_t requested_bytes = 0;
891
892     pa_assert(pd);
893     pa_assert(s);
894     pa_assert(PA_REFCNT_VALUE(s) >= 1);
895     pa_assert(s->state == PA_STREAM_CREATING);
896
897     pa_stream_ref(s);
898
899     if (command != PA_COMMAND_REPLY) {
900         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
901             goto finish;
902
903         pa_stream_set_state(s, PA_STREAM_FAILED);
904         goto finish;
905     }
906
907     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
908         s->channel == PA_INVALID_INDEX ||
909         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
910         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
911         pa_context_fail(s->context, PA_ERR_PROTOCOL);
912         goto finish;
913     }
914
915     s->requested_bytes = (int64_t) requested_bytes;
916
917     if (s->context->version >= 9) {
918         if (s->direction == PA_STREAM_PLAYBACK) {
919             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
920                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
921                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
922                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
923                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
924                 goto finish;
925             }
926         } else if (s->direction == PA_STREAM_RECORD) {
927             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
928                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
929                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
930                 goto finish;
931             }
932         }
933     }
934
935     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
936         pa_sample_spec ss;
937         pa_channel_map cm;
938         const char *dn = NULL;
939         pa_bool_t suspended;
940
941         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
942             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
943             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
944             pa_tagstruct_gets(t, &dn) < 0 ||
945             pa_tagstruct_get_boolean(t, &suspended) < 0) {
946             pa_context_fail(s->context, PA_ERR_PROTOCOL);
947             goto finish;
948         }
949
950         if (!dn || s->device_index == PA_INVALID_INDEX ||
951             ss.channels != cm.channels ||
952             !pa_channel_map_valid(&cm) ||
953             !pa_sample_spec_valid(&ss) ||
954             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
955             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
956             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
957             pa_context_fail(s->context, PA_ERR_PROTOCOL);
958             goto finish;
959         }
960
961         pa_xfree(s->device_name);
962         s->device_name = pa_xstrdup(dn);
963         s->suspended = suspended;
964
965         s->channel_map = cm;
966         s->sample_spec = ss;
967     }
968
969     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
970         pa_usec_t usec;
971
972         if (pa_tagstruct_get_usec(t, &usec) < 0) {
973             pa_context_fail(s->context, PA_ERR_PROTOCOL);
974             goto finish;
975         }
976
977         if (s->direction == PA_STREAM_RECORD)
978             s->timing_info.configured_source_usec = usec;
979         else
980             s->timing_info.configured_sink_usec = usec;
981     }
982
983     if (!pa_tagstruct_eof(t)) {
984         pa_context_fail(s->context, PA_ERR_PROTOCOL);
985         goto finish;
986     }
987
988     if (s->direction == PA_STREAM_RECORD) {
989         pa_assert(!s->record_memblockq);
990
991         s->record_memblockq = pa_memblockq_new(
992                 0,
993                 s->buffer_attr.maxlength,
994                 0,
995                 pa_frame_size(&s->sample_spec),
996                 1,
997                 0,
998                 0,
999                 NULL);
1000     }
1001
1002     s->channel_valid = TRUE;
1003     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1004
1005     create_stream_complete(s);
1006
1007 finish:
1008     pa_stream_unref(s);
1009 }
1010
1011 static int create_stream(
1012         pa_stream_direction_t direction,
1013         pa_stream *s,
1014         const char *dev,
1015         const pa_buffer_attr *attr,
1016         pa_stream_flags_t flags,
1017         const pa_cvolume *volume,
1018         pa_stream *sync_stream) {
1019
1020     pa_tagstruct *t;
1021     uint32_t tag;
1022     pa_bool_t volume_set = FALSE;
1023
1024     pa_assert(s);
1025     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1026     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1027
1028     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1029     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1030     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1031     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1032                                               PA_STREAM_INTERPOLATE_TIMING|
1033                                               PA_STREAM_NOT_MONOTONIC|
1034                                               PA_STREAM_AUTO_TIMING_UPDATE|
1035                                               PA_STREAM_NO_REMAP_CHANNELS|
1036                                               PA_STREAM_NO_REMIX_CHANNELS|
1037                                               PA_STREAM_FIX_FORMAT|
1038                                               PA_STREAM_FIX_RATE|
1039                                               PA_STREAM_FIX_CHANNELS|
1040                                               PA_STREAM_DONT_MOVE|
1041                                               PA_STREAM_VARIABLE_RATE|
1042                                               PA_STREAM_PEAK_DETECT|
1043                                               PA_STREAM_START_MUTED|
1044                                               PA_STREAM_ADJUST_LATENCY|
1045                                               PA_STREAM_EARLY_REQUESTS|
1046                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1047                                               PA_STREAM_START_UNMUTED|
1048                                               PA_STREAM_FAIL_ON_SUSPEND|
1049                                               PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1050
1051     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1052     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1053     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1054     /* Althought some of the other flags are not supported on older
1055      * version, we don't check for them here, because it doesn't hurt
1056      * when they are passed but actually not supported. This makes
1057      * client development easier */
1058
1059     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1060     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1061     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1062     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1063     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1064
1065     pa_stream_ref(s);
1066
1067     s->direction = direction;
1068     s->flags = flags;
1069     s->corked = !!(flags & PA_STREAM_START_CORKED);
1070
1071     if (sync_stream)
1072         s->syncid = sync_stream->syncid;
1073
1074     if (attr)
1075         s->buffer_attr = *attr;
1076     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1077
1078     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1079         pa_usec_t x;
1080
1081         x = pa_rtclock_now();
1082
1083         pa_assert(!s->smoother);
1084         s->smoother = pa_smoother_new(
1085                 SMOOTHER_ADJUST_TIME,
1086                 SMOOTHER_HISTORY_TIME,
1087                 !(flags & PA_STREAM_NOT_MONOTONIC),
1088                 TRUE,
1089                 SMOOTHER_MIN_HISTORY,
1090                 x,
1091                 TRUE);
1092     }
1093
1094     if (!dev)
1095         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1096
1097     t = pa_tagstruct_command(
1098             s->context,
1099             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1100             &tag);
1101
1102     if (s->context->version < 13)
1103         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1104
1105     pa_tagstruct_put(
1106             t,
1107             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1108             PA_TAG_CHANNEL_MAP, &s->channel_map,
1109             PA_TAG_U32, PA_INVALID_INDEX,
1110             PA_TAG_STRING, dev,
1111             PA_TAG_U32, s->buffer_attr.maxlength,
1112             PA_TAG_BOOLEAN, s->corked,
1113             PA_TAG_INVALID);
1114
1115     if (s->direction == PA_STREAM_PLAYBACK) {
1116         pa_cvolume cv;
1117
1118         pa_tagstruct_put(
1119                 t,
1120                 PA_TAG_U32, s->buffer_attr.tlength,
1121                 PA_TAG_U32, s->buffer_attr.prebuf,
1122                 PA_TAG_U32, s->buffer_attr.minreq,
1123                 PA_TAG_U32, s->syncid,
1124                 PA_TAG_INVALID);
1125
1126         volume_set = !!volume;
1127
1128         if (!volume)
1129             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1130
1131         pa_tagstruct_put_cvolume(t, volume);
1132     } else
1133         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1134
1135     if (s->context->version >= 12) {
1136         pa_tagstruct_put(
1137                 t,
1138                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1139                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1140                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1141                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1142                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1143                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1144                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1145                 PA_TAG_INVALID);
1146     }
1147
1148     if (s->context->version >= 13) {
1149
1150         if (s->direction == PA_STREAM_PLAYBACK)
1151             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1152         else
1153             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1154
1155         pa_tagstruct_put(
1156                 t,
1157                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1158                 PA_TAG_PROPLIST, s->proplist,
1159                 PA_TAG_INVALID);
1160
1161         if (s->direction == PA_STREAM_RECORD)
1162             pa_tagstruct_putu32(t, s->direct_on_input);
1163     }
1164
1165     if (s->context->version >= 14) {
1166
1167         if (s->direction == PA_STREAM_PLAYBACK)
1168             pa_tagstruct_put_boolean(t, volume_set);
1169
1170         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1171     }
1172
1173     if (s->context->version >= 15) {
1174
1175         if (s->direction == PA_STREAM_PLAYBACK)
1176             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1177
1178         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1179         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1180     }
1181
1182     if (s->context->version >= 17) {
1183
1184         if (s->direction == PA_STREAM_PLAYBACK)
1185             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1186
1187     }
1188
1189     pa_pstream_send_tagstruct(s->context->pstream, t);
1190     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1191
1192     pa_stream_set_state(s, PA_STREAM_CREATING);
1193
1194     pa_stream_unref(s);
1195     return 0;
1196 }
1197
1198 int pa_stream_connect_playback(
1199         pa_stream *s,
1200         const char *dev,
1201         const pa_buffer_attr *attr,
1202         pa_stream_flags_t flags,
1203         const pa_cvolume *volume,
1204         pa_stream *sync_stream) {
1205
1206     pa_assert(s);
1207     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1208
1209     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1210 }
1211
1212 int pa_stream_connect_record(
1213         pa_stream *s,
1214         const char *dev,
1215         const pa_buffer_attr *attr,
1216         pa_stream_flags_t flags) {
1217
1218     pa_assert(s);
1219     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1220
1221     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1222 }
1223
1224 int pa_stream_begin_write(
1225         pa_stream *s,
1226         void **data,
1227         size_t *nbytes) {
1228
1229     pa_assert(s);
1230     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1231
1232     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1233     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1234     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1235     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1236     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1237
1238     if (*nbytes != (size_t) -1) {
1239         size_t m, fs;
1240
1241         m = pa_mempool_block_size_max(s->context->mempool);
1242         fs = pa_frame_size(&s->sample_spec);
1243
1244         m = (m / fs) * fs;
1245         if (*nbytes > m)
1246             *nbytes = m;
1247     }
1248
1249     if (!s->write_memblock) {
1250         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1251         s->write_data = pa_memblock_acquire(s->write_memblock);
1252     }
1253
1254     *data = s->write_data;
1255     *nbytes = pa_memblock_get_length(s->write_memblock);
1256
1257     return 0;
1258 }
1259
1260 int pa_stream_cancel_write(
1261         pa_stream *s) {
1262
1263     pa_assert(s);
1264     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1265
1266     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1267     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1268     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1269     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1270
1271     pa_assert(s->write_data);
1272
1273     pa_memblock_release(s->write_memblock);
1274     pa_memblock_unref(s->write_memblock);
1275     s->write_memblock = NULL;
1276     s->write_data = NULL;
1277
1278     return 0;
1279 }
1280
1281 int pa_stream_write(
1282         pa_stream *s,
1283         const void *data,
1284         size_t length,
1285         pa_free_cb_t free_cb,
1286         int64_t offset,
1287         pa_seek_mode_t seek) {
1288
1289     pa_assert(s);
1290     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1291     pa_assert(data);
1292
1293     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1294     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1295     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1296     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1297     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1298     PA_CHECK_VALIDITY(s->context,
1299                       !s->write_memblock ||
1300                       ((data >= s->write_data) &&
1301                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1302                       PA_ERR_INVALID);
1303     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1304
1305     if (s->write_memblock) {
1306         pa_memchunk chunk;
1307
1308         /* pa_stream_write_begin() was called before */
1309
1310         pa_memblock_release(s->write_memblock);
1311
1312         chunk.memblock = s->write_memblock;
1313         chunk.index = (const char *) data - (const char *) s->write_data;
1314         chunk.length = length;
1315
1316         s->write_memblock = NULL;
1317         s->write_data = NULL;
1318
1319         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1320         pa_memblock_unref(chunk.memblock);
1321
1322     } else {
1323         pa_seek_mode_t t_seek = seek;
1324         int64_t t_offset = offset;
1325         size_t t_length = length;
1326         const void *t_data = data;
1327
1328         /* pa_stream_write_begin() was not called before */
1329
1330         while (t_length > 0) {
1331             pa_memchunk chunk;
1332
1333             chunk.index = 0;
1334
1335             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1336                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1337                 chunk.length = t_length;
1338             } else {
1339                 void *d;
1340
1341                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1342                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1343
1344                 d = pa_memblock_acquire(chunk.memblock);
1345                 memcpy(d, t_data, chunk.length);
1346                 pa_memblock_release(chunk.memblock);
1347             }
1348
1349             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1350
1351             t_offset = 0;
1352             t_seek = PA_SEEK_RELATIVE;
1353
1354             t_data = (const uint8_t*) t_data + chunk.length;
1355             t_length -= chunk.length;
1356
1357             pa_memblock_unref(chunk.memblock);
1358         }
1359
1360         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1361             free_cb((void*) data);
1362     }
1363
1364     /* This is obviously wrong since we ignore the seeking index . But
1365      * that's OK, the server side applies the same error */
1366     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1367
1368     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1369
1370     if (s->direction == PA_STREAM_PLAYBACK) {
1371
1372         /* Update latency request correction */
1373         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1374
1375             if (seek == PA_SEEK_ABSOLUTE) {
1376                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1377                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1378                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1379             } else if (seek == PA_SEEK_RELATIVE) {
1380                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1381                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1382             } else
1383                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1384         }
1385
1386         /* Update the write index in the already available latency data */
1387         if (s->timing_info_valid) {
1388
1389             if (seek == PA_SEEK_ABSOLUTE) {
1390                 s->timing_info.write_index_corrupt = FALSE;
1391                 s->timing_info.write_index = offset + (int64_t) length;
1392             } else if (seek == PA_SEEK_RELATIVE) {
1393                 if (!s->timing_info.write_index_corrupt)
1394                     s->timing_info.write_index += offset + (int64_t) length;
1395             } else
1396                 s->timing_info.write_index_corrupt = TRUE;
1397         }
1398
1399         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1400             request_auto_timing_update(s, TRUE);
1401     }
1402
1403     return 0;
1404 }
1405
1406 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1407     pa_assert(s);
1408     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1409     pa_assert(data);
1410     pa_assert(length);
1411
1412     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1413     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1414     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1415
1416     if (!s->peek_memchunk.memblock) {
1417
1418         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1419             *data = NULL;
1420             *length = 0;
1421             return 0;
1422         }
1423
1424         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1425     }
1426
1427     pa_assert(s->peek_data);
1428     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1429     *length = s->peek_memchunk.length;
1430     return 0;
1431 }
1432
1433 int pa_stream_drop(pa_stream *s) {
1434     pa_assert(s);
1435     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1436
1437     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1438     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1439     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1440     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1441
1442     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1443
1444     /* Fix the simulated local read index */
1445     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1446         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1447
1448     pa_assert(s->peek_data);
1449     pa_memblock_release(s->peek_memchunk.memblock);
1450     pa_memblock_unref(s->peek_memchunk.memblock);
1451     pa_memchunk_reset(&s->peek_memchunk);
1452
1453     return 0;
1454 }
1455
1456 size_t pa_stream_writable_size(pa_stream *s) {
1457     pa_assert(s);
1458     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1459
1460     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1461     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1462     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1463
1464     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1465 }
1466
1467 size_t pa_stream_readable_size(pa_stream *s) {
1468     pa_assert(s);
1469     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1470
1471     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1472     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1473     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1474
1475     return pa_memblockq_get_length(s->record_memblockq);
1476 }
1477
1478 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1479     pa_operation *o;
1480     pa_tagstruct *t;
1481     uint32_t tag;
1482
1483     pa_assert(s);
1484     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1485
1486     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1487     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1488     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1489
1490     /* Ask for a timing update before we cork/uncork to get the best
1491      * accuracy for the transport latency suitable for the
1492      * check_smoother_status() call in the started callback */
1493     request_auto_timing_update(s, TRUE);
1494
1495     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1496
1497     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1498     pa_tagstruct_putu32(t, s->channel);
1499     pa_pstream_send_tagstruct(s->context->pstream, t);
1500     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1501
1502     /* This might cause the read index to conitnue again, hence
1503      * let's request a timing update */
1504     request_auto_timing_update(s, TRUE);
1505
1506     return o;
1507 }
1508
1509 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1510     pa_usec_t usec;
1511
1512     pa_assert(s);
1513     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1514     pa_assert(s->state == PA_STREAM_READY);
1515     pa_assert(s->direction != PA_STREAM_UPLOAD);
1516     pa_assert(s->timing_info_valid);
1517     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1518     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1519
1520     if (s->direction == PA_STREAM_PLAYBACK) {
1521         /* The last byte that was written into the output device
1522          * had this time value associated */
1523         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1524
1525         if (!s->corked && !s->suspended) {
1526
1527             if (!ignore_transport)
1528                 /* Because the latency info took a little time to come
1529                  * to us, we assume that the real output time is actually
1530                  * a little ahead */
1531                 usec += s->timing_info.transport_usec;
1532
1533             /* However, the output device usually maintains a buffer
1534                too, hence the real sample currently played is a little
1535                back  */
1536             if (s->timing_info.sink_usec >= usec)
1537                 usec = 0;
1538             else
1539                 usec -= s->timing_info.sink_usec;
1540         }
1541
1542     } else {
1543         pa_assert(s->direction == PA_STREAM_RECORD);
1544
1545         /* The last byte written into the server side queue had
1546          * this time value associated */
1547         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1548
1549         if (!s->corked && !s->suspended) {
1550
1551             if (!ignore_transport)
1552                 /* Add transport latency */
1553                 usec += s->timing_info.transport_usec;
1554
1555             /* Add latency of data in device buffer */
1556             usec += s->timing_info.source_usec;
1557
1558             /* If this is a monitor source, we need to correct the
1559              * time by the playback device buffer */
1560             if (s->timing_info.sink_usec >= usec)
1561                 usec = 0;
1562             else
1563                 usec -= s->timing_info.sink_usec;
1564         }
1565     }
1566
1567     return usec;
1568 }
1569
1570 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1571     pa_operation *o = userdata;
1572     struct timeval local, remote, now;
1573     pa_timing_info *i;
1574     pa_bool_t playing = FALSE;
1575     uint64_t underrun_for = 0, playing_for = 0;
1576
1577     pa_assert(pd);
1578     pa_assert(o);
1579     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1580
1581     if (!o->context || !o->stream)
1582         goto finish;
1583
1584     i = &o->stream->timing_info;
1585
1586     o->stream->timing_info_valid = FALSE;
1587     i->write_index_corrupt = TRUE;
1588     i->read_index_corrupt = TRUE;
1589
1590     if (command != PA_COMMAND_REPLY) {
1591         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1592             goto finish;
1593
1594     } else {
1595
1596         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1597             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1598             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1599             pa_tagstruct_get_timeval(t, &local) < 0 ||
1600             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1601             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1602             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1603
1604             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1605             goto finish;
1606         }
1607
1608         if (o->context->version >= 13 &&
1609             o->stream->direction == PA_STREAM_PLAYBACK)
1610             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1611                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1612
1613                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1614                 goto finish;
1615             }
1616
1617
1618         if (!pa_tagstruct_eof(t)) {
1619             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1620             goto finish;
1621         }
1622         o->stream->timing_info_valid = TRUE;
1623         i->write_index_corrupt = FALSE;
1624         i->read_index_corrupt = FALSE;
1625
1626         i->playing = (int) playing;
1627         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1628
1629         pa_gettimeofday(&now);
1630
1631         /* Calculcate timestamps */
1632         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1633             /* local and remote seem to have synchronized clocks */
1634
1635             if (o->stream->direction == PA_STREAM_PLAYBACK)
1636                 i->transport_usec = pa_timeval_diff(&remote, &local);
1637             else
1638                 i->transport_usec = pa_timeval_diff(&now, &remote);
1639
1640             i->synchronized_clocks = TRUE;
1641             i->timestamp = remote;
1642         } else {
1643             /* clocks are not synchronized, let's estimate latency then */
1644             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1645             i->synchronized_clocks = FALSE;
1646             i->timestamp = local;
1647             pa_timeval_add(&i->timestamp, i->transport_usec);
1648         }
1649
1650         /* Invalidate read and write indexes if necessary */
1651         if (tag < o->stream->read_index_not_before)
1652             i->read_index_corrupt = TRUE;
1653
1654         if (tag < o->stream->write_index_not_before)
1655             i->write_index_corrupt = TRUE;
1656
1657         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1658             /* Write index correction */
1659
1660             int n, j;
1661             uint32_t ctag = tag;
1662
1663             /* Go through the saved correction values and add up the
1664              * total correction.*/
1665             for (n = 0, j = o->stream->current_write_index_correction+1;
1666                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1667                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1668
1669                 /* Step over invalid data or out-of-date data */
1670                 if (!o->stream->write_index_corrections[j].valid ||
1671                     o->stream->write_index_corrections[j].tag < ctag)
1672                     continue;
1673
1674                 /* Make sure that everything is in order */
1675                 ctag = o->stream->write_index_corrections[j].tag+1;
1676
1677                 /* Now fix the write index */
1678                 if (o->stream->write_index_corrections[j].corrupt) {
1679                     /* A corrupting seek was made */
1680                     i->write_index_corrupt = TRUE;
1681                 } else if (o->stream->write_index_corrections[j].absolute) {
1682                     /* An absolute seek was made */
1683                     i->write_index = o->stream->write_index_corrections[j].value;
1684                     i->write_index_corrupt = FALSE;
1685                 } else if (!i->write_index_corrupt) {
1686                     /* A relative seek was made */
1687                     i->write_index += o->stream->write_index_corrections[j].value;
1688                 }
1689             }
1690
1691             /* Clear old correction entries */
1692             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1693                 if (!o->stream->write_index_corrections[n].valid)
1694                     continue;
1695
1696                 if (o->stream->write_index_corrections[n].tag <= tag)
1697                     o->stream->write_index_corrections[n].valid = FALSE;
1698             }
1699         }
1700
1701         if (o->stream->direction == PA_STREAM_RECORD) {
1702             /* Read index correction */
1703
1704             if (!i->read_index_corrupt)
1705                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1706         }
1707
1708         /* Update smoother */
1709         if (o->stream->smoother) {
1710             pa_usec_t u, x;
1711
1712             u = x = pa_rtclock_now() - i->transport_usec;
1713
1714             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1715                 pa_usec_t su;
1716
1717                 /* If we weren't playing then it will take some time
1718                  * until the audio will actually come out through the
1719                  * speakers. Since we follow that timing here, we need
1720                  * to try to fix this up */
1721
1722                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1723
1724                 if (su < i->sink_usec)
1725                     x += i->sink_usec - su;
1726             }
1727
1728             if (!i->playing)
1729                 pa_smoother_pause(o->stream->smoother, x);
1730
1731             /* Update the smoother */
1732             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1733                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1734                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1735
1736             if (i->playing)
1737                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1738         }
1739     }
1740
1741     o->stream->auto_timing_update_requested = FALSE;
1742
1743     if (o->stream->latency_update_callback)
1744         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1745
1746     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1747         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1748         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1749     }
1750
1751 finish:
1752
1753     pa_operation_done(o);
1754     pa_operation_unref(o);
1755 }
1756
1757 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1758     uint32_t tag;
1759     pa_operation *o;
1760     pa_tagstruct *t;
1761     struct timeval now;
1762     int cidx = 0;
1763
1764     pa_assert(s);
1765     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1766
1767     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1768     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1769     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1770
1771     if (s->direction == PA_STREAM_PLAYBACK) {
1772         /* Find a place to store the write_index correction data for this entry */
1773         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1774
1775         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1776         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1777     }
1778     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1779
1780     t = pa_tagstruct_command(
1781             s->context,
1782             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1783             &tag);
1784     pa_tagstruct_putu32(t, s->channel);
1785     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1786
1787     pa_pstream_send_tagstruct(s->context->pstream, t);
1788     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1789
1790     if (s->direction == PA_STREAM_PLAYBACK) {
1791         /* Fill in initial correction data */
1792
1793         s->current_write_index_correction = cidx;
1794
1795         s->write_index_corrections[cidx].valid = TRUE;
1796         s->write_index_corrections[cidx].absolute = FALSE;
1797         s->write_index_corrections[cidx].corrupt = FALSE;
1798         s->write_index_corrections[cidx].tag = tag;
1799         s->write_index_corrections[cidx].value = 0;
1800     }
1801
1802     return o;
1803 }
1804
1805 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1806     pa_stream *s = userdata;
1807
1808     pa_assert(pd);
1809     pa_assert(s);
1810     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1811
1812     pa_stream_ref(s);
1813
1814     if (command != PA_COMMAND_REPLY) {
1815         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1816             goto finish;
1817
1818         pa_stream_set_state(s, PA_STREAM_FAILED);
1819         goto finish;
1820     } else if (!pa_tagstruct_eof(t)) {
1821         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1822         goto finish;
1823     }
1824
1825     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1826
1827 finish:
1828     pa_stream_unref(s);
1829 }
1830
1831 int pa_stream_disconnect(pa_stream *s) {
1832     pa_tagstruct *t;
1833     uint32_t tag;
1834
1835     pa_assert(s);
1836     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1837
1838     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1839     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1840     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1841
1842     pa_stream_ref(s);
1843
1844     t = pa_tagstruct_command(
1845             s->context,
1846             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1847                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1848             &tag);
1849     pa_tagstruct_putu32(t, s->channel);
1850     pa_pstream_send_tagstruct(s->context->pstream, t);
1851     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1852
1853     pa_stream_unref(s);
1854     return 0;
1855 }
1856
1857 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1858     pa_assert(s);
1859     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1860
1861     if (pa_detect_fork())
1862         return;
1863
1864     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1865         return;
1866
1867     s->read_callback = cb;
1868     s->read_userdata = userdata;
1869 }
1870
1871 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1872     pa_assert(s);
1873     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1874
1875     if (pa_detect_fork())
1876         return;
1877
1878     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1879         return;
1880
1881     s->write_callback = cb;
1882     s->write_userdata = userdata;
1883 }
1884
1885 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1886     pa_assert(s);
1887     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1888
1889     if (pa_detect_fork())
1890         return;
1891
1892     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1893         return;
1894
1895     s->state_callback = cb;
1896     s->state_userdata = userdata;
1897 }
1898
1899 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1900     pa_assert(s);
1901     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1902
1903     if (pa_detect_fork())
1904         return;
1905
1906     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1907         return;
1908
1909     s->overflow_callback = cb;
1910     s->overflow_userdata = userdata;
1911 }
1912
1913 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1914     pa_assert(s);
1915     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1916
1917     if (pa_detect_fork())
1918         return;
1919
1920     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1921         return;
1922
1923     s->underflow_callback = cb;
1924     s->underflow_userdata = userdata;
1925 }
1926
1927 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1928     pa_assert(s);
1929     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1930
1931     if (pa_detect_fork())
1932         return;
1933
1934     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1935         return;
1936
1937     s->latency_update_callback = cb;
1938     s->latency_update_userdata = userdata;
1939 }
1940
1941 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1942     pa_assert(s);
1943     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1944
1945     if (pa_detect_fork())
1946         return;
1947
1948     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1949         return;
1950
1951     s->moved_callback = cb;
1952     s->moved_userdata = userdata;
1953 }
1954
1955 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1956     pa_assert(s);
1957     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1958
1959     if (pa_detect_fork())
1960         return;
1961
1962     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1963         return;
1964
1965     s->suspended_callback = cb;
1966     s->suspended_userdata = userdata;
1967 }
1968
1969 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1970     pa_assert(s);
1971     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1972
1973     if (pa_detect_fork())
1974         return;
1975
1976     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1977         return;
1978
1979     s->started_callback = cb;
1980     s->started_userdata = userdata;
1981 }
1982
1983 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1984     pa_assert(s);
1985     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1986
1987     if (pa_detect_fork())
1988         return;
1989
1990     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1991         return;
1992
1993     s->event_callback = cb;
1994     s->event_userdata = userdata;
1995 }
1996
1997 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1998     pa_assert(s);
1999     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2000
2001     if (pa_detect_fork())
2002         return;
2003
2004     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2005         return;
2006
2007     s->buffer_attr_callback = cb;
2008     s->buffer_attr_userdata = userdata;
2009 }
2010
2011 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2012     pa_operation *o = userdata;
2013     int success = 1;
2014
2015     pa_assert(pd);
2016     pa_assert(o);
2017     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2018
2019     if (!o->context)
2020         goto finish;
2021
2022     if (command != PA_COMMAND_REPLY) {
2023         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2024             goto finish;
2025
2026         success = 0;
2027     } else if (!pa_tagstruct_eof(t)) {
2028         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2029         goto finish;
2030     }
2031
2032     if (o->callback) {
2033         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2034         cb(o->stream, success, o->userdata);
2035     }
2036
2037 finish:
2038     pa_operation_done(o);
2039     pa_operation_unref(o);
2040 }
2041
2042 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2043     pa_operation *o;
2044     pa_tagstruct *t;
2045     uint32_t tag;
2046
2047     pa_assert(s);
2048     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2049
2050     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2051     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2052     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2053
2054     /* Ask for a timing update before we cork/uncork to get the best
2055      * accuracy for the transport latency suitable for the
2056      * check_smoother_status() call in the started callback */
2057     request_auto_timing_update(s, TRUE);
2058
2059     s->corked = b;
2060
2061     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2062
2063     t = pa_tagstruct_command(
2064             s->context,
2065             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2066             &tag);
2067     pa_tagstruct_putu32(t, s->channel);
2068     pa_tagstruct_put_boolean(t, !!b);
2069     pa_pstream_send_tagstruct(s->context->pstream, t);
2070     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2071
2072     check_smoother_status(s, FALSE, FALSE, FALSE);
2073
2074     /* This might cause the indexes to hang/start again, hence let's
2075      * request a timing update, after the cork/uncork, too */
2076     request_auto_timing_update(s, TRUE);
2077
2078     return o;
2079 }
2080
2081 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2082     pa_tagstruct *t;
2083     pa_operation *o;
2084     uint32_t tag;
2085
2086     pa_assert(s);
2087     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2088
2089     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2090     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2091
2092     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2093
2094     t = pa_tagstruct_command(s->context, command, &tag);
2095     pa_tagstruct_putu32(t, s->channel);
2096     pa_pstream_send_tagstruct(s->context->pstream, t);
2097     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2098
2099     return o;
2100 }
2101
2102 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2103     pa_operation *o;
2104
2105     pa_assert(s);
2106     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2107
2108     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2109     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2110     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2111
2112     /* Ask for a timing update *before* the flush, so that the
2113      * transport usec is as up to date as possible when we get the
2114      * underflow message and update the smoother status*/
2115     request_auto_timing_update(s, TRUE);
2116
2117     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2118         return NULL;
2119
2120     if (s->direction == PA_STREAM_PLAYBACK) {
2121
2122         if (s->write_index_corrections[s->current_write_index_correction].valid)
2123             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2124
2125         if (s->buffer_attr.prebuf > 0)
2126             check_smoother_status(s, FALSE, FALSE, TRUE);
2127
2128         /* This will change the write index, but leave the
2129          * read index untouched. */
2130         invalidate_indexes(s, FALSE, TRUE);
2131
2132     } else
2133         /* For record streams this has no influence on the write
2134          * index, but the read index might jump. */
2135         invalidate_indexes(s, TRUE, FALSE);
2136
2137     return o;
2138 }
2139
2140 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2141     pa_operation *o;
2142
2143     pa_assert(s);
2144     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2145
2146     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2147     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2148     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2149     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2150
2151     /* Ask for a timing update before we cork/uncork to get the best
2152      * accuracy for the transport latency suitable for the
2153      * check_smoother_status() call in the started callback */
2154     request_auto_timing_update(s, TRUE);
2155
2156     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2157         return NULL;
2158
2159     /* This might cause the read index to hang again, hence
2160      * let's request a timing update */
2161     request_auto_timing_update(s, TRUE);
2162
2163     return o;
2164 }
2165
2166 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2167     pa_operation *o;
2168
2169     pa_assert(s);
2170     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2171
2172     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2173     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2174     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2175     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2176
2177     /* Ask for a timing update before we cork/uncork to get the best
2178      * accuracy for the transport latency suitable for the
2179      * check_smoother_status() call in the started callback */
2180     request_auto_timing_update(s, TRUE);
2181
2182     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2183         return NULL;
2184
2185     /* This might cause the read index to start moving again, hence
2186      * let's request a timing update */
2187     request_auto_timing_update(s, TRUE);
2188
2189     return o;
2190 }
2191
2192 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2193     pa_operation *o;
2194
2195     pa_assert(s);
2196     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2197     pa_assert(name);
2198
2199     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2200     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2201     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2202
2203     if (s->context->version >= 13) {
2204         pa_proplist *p = pa_proplist_new();
2205
2206         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2207         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2208         pa_proplist_free(p);
2209     } else {
2210         pa_tagstruct *t;
2211         uint32_t tag;
2212
2213         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2214         t = pa_tagstruct_command(
2215                 s->context,
2216                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2217                 &tag);
2218         pa_tagstruct_putu32(t, s->channel);
2219         pa_tagstruct_puts(t, name);
2220         pa_pstream_send_tagstruct(s->context->pstream, t);
2221         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2222     }
2223
2224     return o;
2225 }
2226
2227 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2228     pa_usec_t usec;
2229
2230     pa_assert(s);
2231     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2232
2233     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2234     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2235     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2236     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2237     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2238     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2239
2240     if (s->smoother)
2241         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2242     else
2243         usec = calc_time(s, FALSE);
2244
2245     /* Make sure the time runs monotonically */
2246     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2247         if (usec < s->previous_time)
2248             usec = s->previous_time;
2249         else
2250             s->previous_time = usec;
2251     }
2252
2253     if (r_usec)
2254         *r_usec = usec;
2255
2256     return 0;
2257 }
2258
2259 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2260     pa_assert(s);
2261     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2262
2263     if (negative)
2264         *negative = 0;
2265
2266     if (a >= b)
2267         return a-b;
2268     else {
2269         if (negative && s->direction == PA_STREAM_RECORD) {
2270             *negative = 1;
2271             return b-a;
2272         } else
2273             return 0;
2274     }
2275 }
2276
2277 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2278     pa_usec_t t, c;
2279     int r;
2280     int64_t cindex;
2281
2282     pa_assert(s);
2283     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2284     pa_assert(r_usec);
2285
2286     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2287     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2288     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2289     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2290     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2291     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2292
2293     if ((r = pa_stream_get_time(s, &t)) < 0)
2294         return r;
2295
2296     if (s->direction == PA_STREAM_PLAYBACK)
2297         cindex = s->timing_info.write_index;
2298     else
2299         cindex = s->timing_info.read_index;
2300
2301     if (cindex < 0)
2302         cindex = 0;
2303
2304     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2305
2306     if (s->direction == PA_STREAM_PLAYBACK)
2307         *r_usec = time_counter_diff(s, c, t, negative);
2308     else
2309         *r_usec = time_counter_diff(s, t, c, negative);
2310
2311     return 0;
2312 }
2313
2314 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2315     pa_assert(s);
2316     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2317
2318     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2319     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2320     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2321     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2322
2323     return &s->timing_info;
2324 }
2325
2326 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2327     pa_assert(s);
2328     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2329
2330     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2331
2332     return &s->sample_spec;
2333 }
2334
2335 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2336     pa_assert(s);
2337     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2338
2339     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2340
2341     return &s->channel_map;
2342 }
2343
2344 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2345     pa_assert(s);
2346     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2347
2348     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2349     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2350     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2351
2352     return &s->buffer_attr;
2353 }
2354
2355 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2356     pa_operation *o = userdata;
2357     int success = 1;
2358
2359     pa_assert(pd);
2360     pa_assert(o);
2361     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2362
2363     if (!o->context)
2364         goto finish;
2365
2366     if (command != PA_COMMAND_REPLY) {
2367         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2368             goto finish;
2369
2370         success = 0;
2371     } else {
2372         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2373             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2374                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2375                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2376                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2377                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2378                 goto finish;
2379             }
2380         } else if (o->stream->direction == PA_STREAM_RECORD) {
2381             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2382                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2383                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2384                 goto finish;
2385             }
2386         }
2387
2388         if (o->stream->context->version >= 13) {
2389             pa_usec_t usec;
2390
2391             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2392                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2393                 goto finish;
2394             }
2395
2396             if (o->stream->direction == PA_STREAM_RECORD)
2397                 o->stream->timing_info.configured_source_usec = usec;
2398             else
2399                 o->stream->timing_info.configured_sink_usec = usec;
2400         }
2401
2402         if (!pa_tagstruct_eof(t)) {
2403             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2404             goto finish;
2405         }
2406     }
2407
2408     if (o->callback) {
2409         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2410         cb(o->stream, success, o->userdata);
2411     }
2412
2413 finish:
2414     pa_operation_done(o);
2415     pa_operation_unref(o);
2416 }
2417
2418
2419 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2420     pa_operation *o;
2421     pa_tagstruct *t;
2422     uint32_t tag;
2423
2424     pa_assert(s);
2425     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2426     pa_assert(attr);
2427
2428     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2429     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2430     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2431     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2432
2433     /* Ask for a timing update before we cork/uncork to get the best
2434      * accuracy for the transport latency suitable for the
2435      * check_smoother_status() call in the started callback */
2436     request_auto_timing_update(s, TRUE);
2437
2438     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2439
2440     t = pa_tagstruct_command(
2441             s->context,
2442             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2443             &tag);
2444     pa_tagstruct_putu32(t, s->channel);
2445
2446     pa_tagstruct_putu32(t, attr->maxlength);
2447
2448     if (s->direction == PA_STREAM_PLAYBACK)
2449         pa_tagstruct_put(
2450                 t,
2451                 PA_TAG_U32, attr->tlength,
2452                 PA_TAG_U32, attr->prebuf,
2453                 PA_TAG_U32, attr->minreq,
2454                 PA_TAG_INVALID);
2455     else
2456         pa_tagstruct_putu32(t, attr->fragsize);
2457
2458     if (s->context->version >= 13)
2459         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2460
2461     if (s->context->version >= 14)
2462         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2463
2464     pa_pstream_send_tagstruct(s->context->pstream, t);
2465     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2466
2467     /* This might cause changes in the read/write indexex, hence let's
2468      * request a timing update */
2469     request_auto_timing_update(s, TRUE);
2470
2471     return o;
2472 }
2473
2474 uint32_t pa_stream_get_device_index(pa_stream *s) {
2475     pa_assert(s);
2476     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2477
2478     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2479     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2480     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2481     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2482     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2483
2484     return s->device_index;
2485 }
2486
2487 const char *pa_stream_get_device_name(pa_stream *s) {
2488     pa_assert(s);
2489     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2490
2491     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2492     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2493     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2494     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2495     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2496
2497     return s->device_name;
2498 }
2499
2500 int pa_stream_is_suspended(pa_stream *s) {
2501     pa_assert(s);
2502     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2503
2504     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2505     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2506     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2507     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2508
2509     return s->suspended;
2510 }
2511
2512 int pa_stream_is_corked(pa_stream *s) {
2513     pa_assert(s);
2514     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2515
2516     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2517     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2518     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2519
2520     return s->corked;
2521 }
2522
2523 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2524     pa_operation *o = userdata;
2525     int success = 1;
2526
2527     pa_assert(pd);
2528     pa_assert(o);
2529     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2530
2531     if (!o->context)
2532         goto finish;
2533
2534     if (command != PA_COMMAND_REPLY) {
2535         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2536             goto finish;
2537
2538         success = 0;
2539     } else {
2540
2541         if (!pa_tagstruct_eof(t)) {
2542             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2543             goto finish;
2544         }
2545     }
2546
2547     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2548     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2549
2550     if (o->callback) {
2551         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2552         cb(o->stream, success, o->userdata);
2553     }
2554
2555 finish:
2556     pa_operation_done(o);
2557     pa_operation_unref(o);
2558 }
2559
2560
2561 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2562     pa_operation *o;
2563     pa_tagstruct *t;
2564     uint32_t tag;
2565
2566     pa_assert(s);
2567     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2568
2569     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2570     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2571     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2572     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2573     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2574     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2575
2576     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2577     o->private = PA_UINT_TO_PTR(rate);
2578
2579     t = pa_tagstruct_command(
2580             s->context,
2581             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2582             &tag);
2583     pa_tagstruct_putu32(t, s->channel);
2584     pa_tagstruct_putu32(t, rate);
2585
2586     pa_pstream_send_tagstruct(s->context->pstream, t);
2587     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2588
2589     return o;
2590 }
2591
2592 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2593     pa_operation *o;
2594     pa_tagstruct *t;
2595     uint32_t tag;
2596
2597     pa_assert(s);
2598     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2599
2600     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2601     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2602     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2603     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2604     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2605
2606     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2607
2608     t = pa_tagstruct_command(
2609             s->context,
2610             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2611             &tag);
2612     pa_tagstruct_putu32(t, s->channel);
2613     pa_tagstruct_putu32(t, (uint32_t) mode);
2614     pa_tagstruct_put_proplist(t, p);
2615
2616     pa_pstream_send_tagstruct(s->context->pstream, t);
2617     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2618
2619     /* Please note that we don't update s->proplist here, because we
2620      * don't export that field */
2621
2622     return o;
2623 }
2624
2625 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2626     pa_operation *o;
2627     pa_tagstruct *t;
2628     uint32_t tag;
2629     const char * const*k;
2630
2631     pa_assert(s);
2632     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2633
2634     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2635     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2636     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2637     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2638     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2639
2640     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2641
2642     t = pa_tagstruct_command(
2643             s->context,
2644             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2645             &tag);
2646     pa_tagstruct_putu32(t, s->channel);
2647
2648     for (k = keys; *k; k++)
2649         pa_tagstruct_puts(t, *k);
2650
2651     pa_tagstruct_puts(t, NULL);
2652
2653     pa_pstream_send_tagstruct(s->context->pstream, t);
2654     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2655
2656     /* Please note that we don't update s->proplist here, because we
2657      * don't export that field */
2658
2659     return o;
2660 }
2661
2662 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2663     pa_assert(s);
2664     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2665
2666     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2667     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2668     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2669     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2670
2671     s->direct_on_input = sink_input_idx;
2672
2673     return 0;
2674 }
2675
2676 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2677     pa_assert(s);
2678     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2679
2680     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2681     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2682     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2683
2684     return s->direct_on_input;
2685 }