native: rework handling of seeks that depend on variables the client does not know...
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/stream.h>
33 #include <pulse/timeval.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "fork-detect.h"
45 #include "internal.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59     s->read_callback = NULL;
60     s->read_userdata = NULL;
61     s->write_callback = NULL;
62     s->write_userdata = NULL;
63     s->state_callback = NULL;
64     s->state_userdata = NULL;
65     s->overflow_callback = NULL;
66     s->overflow_userdata = NULL;
67     s->underflow_callback = NULL;
68     s->underflow_userdata = NULL;
69     s->latency_update_callback = NULL;
70     s->latency_update_userdata = NULL;
71     s->moved_callback = NULL;
72     s->moved_userdata = NULL;
73     s->suspended_callback = NULL;
74     s->suspended_userdata = NULL;
75     s->started_callback = NULL;
76     s->started_userdata = NULL;
77     s->event_callback = NULL;
78     s->event_userdata = NULL;
79     s->buffer_attr_callback = NULL;
80     s->buffer_attr_userdata = NULL;
81 }
82
83 pa_stream *pa_stream_new_with_proplist(
84         pa_context *c,
85         const char *name,
86         const pa_sample_spec *ss,
87         const pa_channel_map *map,
88         pa_proplist *p) {
89
90     pa_stream *s;
91     int i;
92     pa_channel_map tmap;
93
94     pa_assert(c);
95     pa_assert(PA_REFCNT_VALUE(c) >= 1);
96
97     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
102     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
103     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104
105     if (!map)
106         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
107
108     s = pa_xnew(pa_stream, 1);
109     PA_REFCNT_INIT(s);
110     s->context = c;
111     s->mainloop = c->mainloop;
112
113     s->direction = PA_STREAM_NODIRECTION;
114     s->state = PA_STREAM_UNCONNECTED;
115     s->flags = 0;
116
117     s->sample_spec = *ss;
118     s->channel_map = *map;
119
120     s->direct_on_input = PA_INVALID_INDEX;
121
122     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
123     if (name)
124         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
125
126     s->channel = 0;
127     s->channel_valid = FALSE;
128     s->syncid = c->csyncid++;
129     s->stream_index = PA_INVALID_INDEX;
130
131     s->requested_bytes = 0;
132     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
133
134     /* We initialize der target length here, so that if the user
135      * passes no explicit buffering metrics the default is similar to
136      * what older PA versions provided. */
137
138     s->buffer_attr.maxlength = (uint32_t) -1;
139     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
140     s->buffer_attr.minreq = (uint32_t) -1;
141     s->buffer_attr.prebuf = (uint32_t) -1;
142     s->buffer_attr.fragsize = (uint32_t) -1;
143
144     s->device_index = PA_INVALID_INDEX;
145     s->device_name = NULL;
146     s->suspended = FALSE;
147     s->corked = FALSE;
148
149     s->write_memblock = NULL;
150     s->write_data = NULL;
151
152     pa_memchunk_reset(&s->peek_memchunk);
153     s->peek_data = NULL;
154     s->record_memblockq = NULL;
155
156     memset(&s->timing_info, 0, sizeof(s->timing_info));
157     s->timing_info_valid = FALSE;
158
159     s->previous_time = 0;
160
161     s->read_index_not_before = 0;
162     s->write_index_not_before = 0;
163     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
164         s->write_index_corrections[i].valid = 0;
165     s->current_write_index_correction = 0;
166
167     s->auto_timing_update_event = NULL;
168     s->auto_timing_update_requested = FALSE;
169     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
170
171     reset_callbacks(s);
172
173     s->smoother = NULL;
174
175     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
176     PA_LLIST_PREPEND(pa_stream, c->streams, s);
177     pa_stream_ref(s);
178
179     return s;
180 }
181
182 static void stream_unlink(pa_stream *s) {
183     pa_operation *o, *n;
184     pa_assert(s);
185
186     if (!s->context)
187         return;
188
189     /* Detach from context */
190
191     /* Unref all operatio object that point to us */
192     for (o = s->context->operations; o; o = n) {
193         n = o->next;
194
195         if (o->stream == s)
196             pa_operation_cancel(o);
197     }
198
199     /* Drop all outstanding replies for this stream */
200     if (s->context->pdispatch)
201         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
202
203     if (s->channel_valid) {
204         pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
205         s->channel = 0;
206         s->channel_valid = FALSE;
207     }
208
209     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
210     pa_stream_unref(s);
211
212     s->context = NULL;
213
214     if (s->auto_timing_update_event) {
215         pa_assert(s->mainloop);
216         s->mainloop->time_free(s->auto_timing_update_event);
217     }
218
219     reset_callbacks(s);
220 }
221
222 static void stream_free(pa_stream *s) {
223     pa_assert(s);
224
225     stream_unlink(s);
226
227     if (s->write_memblock) {
228         pa_memblock_release(s->write_memblock);
229         pa_memblock_unref(s->write_data);
230     }
231
232     if (s->peek_memchunk.memblock) {
233         if (s->peek_data)
234             pa_memblock_release(s->peek_memchunk.memblock);
235         pa_memblock_unref(s->peek_memchunk.memblock);
236     }
237
238     if (s->record_memblockq)
239         pa_memblockq_free(s->record_memblockq);
240
241     if (s->proplist)
242         pa_proplist_free(s->proplist);
243
244     if (s->smoother)
245         pa_smoother_free(s->smoother);
246
247     pa_xfree(s->device_name);
248     pa_xfree(s);
249 }
250
251 void pa_stream_unref(pa_stream *s) {
252     pa_assert(s);
253     pa_assert(PA_REFCNT_VALUE(s) >= 1);
254
255     if (PA_REFCNT_DEC(s) <= 0)
256         stream_free(s);
257 }
258
259 pa_stream* pa_stream_ref(pa_stream *s) {
260     pa_assert(s);
261     pa_assert(PA_REFCNT_VALUE(s) >= 1);
262
263     PA_REFCNT_INC(s);
264     return s;
265 }
266
267 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
268     pa_assert(s);
269     pa_assert(PA_REFCNT_VALUE(s) >= 1);
270
271     return s->state;
272 }
273
274 pa_context* pa_stream_get_context(pa_stream *s) {
275     pa_assert(s);
276     pa_assert(PA_REFCNT_VALUE(s) >= 1);
277
278     return s->context;
279 }
280
281 uint32_t pa_stream_get_index(pa_stream *s) {
282     pa_assert(s);
283     pa_assert(PA_REFCNT_VALUE(s) >= 1);
284
285     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
286     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
287
288     return s->stream_index;
289 }
290
291 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
292     pa_assert(s);
293     pa_assert(PA_REFCNT_VALUE(s) >= 1);
294
295     if (s->state == st)
296         return;
297
298     pa_stream_ref(s);
299
300     s->state = st;
301
302     if (s->state_callback)
303         s->state_callback(s, s->state_userdata);
304
305     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
306         stream_unlink(s);
307
308     pa_stream_unref(s);
309 }
310
311 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
312     pa_assert(s);
313     pa_assert(PA_REFCNT_VALUE(s) >= 1);
314
315     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
316         return;
317
318     if (s->state == PA_STREAM_READY &&
319         (force || !s->auto_timing_update_requested)) {
320         pa_operation *o;
321
322 /*         pa_log("Automatically requesting new timing data"); */
323
324         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
325             pa_operation_unref(o);
326             s->auto_timing_update_requested = TRUE;
327         }
328     }
329
330     if (s->auto_timing_update_event) {
331         if (force)
332             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
333
334         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
335
336         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
337     }
338 }
339
340 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
341     pa_context *c = userdata;
342     pa_stream *s;
343     uint32_t channel;
344
345     pa_assert(pd);
346     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
347     pa_assert(t);
348     pa_assert(c);
349     pa_assert(PA_REFCNT_VALUE(c) >= 1);
350
351     pa_context_ref(c);
352
353     if (pa_tagstruct_getu32(t, &channel) < 0 ||
354         !pa_tagstruct_eof(t)) {
355         pa_context_fail(c, PA_ERR_PROTOCOL);
356         goto finish;
357     }
358
359     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
360         goto finish;
361
362     if (s->state != PA_STREAM_READY)
363         goto finish;
364
365     pa_context_set_error(c, PA_ERR_KILLED);
366     pa_stream_set_state(s, PA_STREAM_FAILED);
367
368 finish:
369     pa_context_unref(c);
370 }
371
372 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
373     pa_usec_t x;
374
375     pa_assert(s);
376     pa_assert(!force_start || !force_stop);
377
378     if (!s->smoother)
379         return;
380
381     x = pa_rtclock_now();
382
383     if (s->timing_info_valid) {
384         if (aposteriori)
385             x -= s->timing_info.transport_usec;
386         else
387             x += s->timing_info.transport_usec;
388     }
389
390     if (s->suspended || s->corked || force_stop)
391         pa_smoother_pause(s->smoother, x);
392     else if (force_start || s->buffer_attr.prebuf == 0) {
393
394         if (!s->timing_info_valid &&
395             !aposteriori &&
396             !force_start &&
397             !force_stop &&
398             s->context->version >= 13) {
399
400             /* If the server supports STARTED events we take them as
401              * indications when audio really starts/stops playing, if
402              * we don't have any timing info yet -- instead of trying
403              * to be smart and guessing the server time. Otherwise the
404              * unknown transport delay add too much noise to our time
405              * calculations. */
406
407             return;
408         }
409
410         pa_smoother_resume(s->smoother, x, TRUE);
411     }
412
413     /* Please note that we have no idea if playback actually started
414      * if prebuf is non-zero! */
415 }
416
417 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
418     pa_context *c = userdata;
419     pa_stream *s;
420     uint32_t channel;
421     const char *dn;
422     pa_bool_t suspended;
423     uint32_t di;
424     pa_usec_t usec = 0;
425     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
426
427     pa_assert(pd);
428     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
429     pa_assert(t);
430     pa_assert(c);
431     pa_assert(PA_REFCNT_VALUE(c) >= 1);
432
433     pa_context_ref(c);
434
435     if (c->version < 12) {
436         pa_context_fail(c, PA_ERR_PROTOCOL);
437         goto finish;
438     }
439
440     if (pa_tagstruct_getu32(t, &channel) < 0 ||
441         pa_tagstruct_getu32(t, &di) < 0 ||
442         pa_tagstruct_gets(t, &dn) < 0 ||
443         pa_tagstruct_get_boolean(t, &suspended) < 0) {
444         pa_context_fail(c, PA_ERR_PROTOCOL);
445         goto finish;
446     }
447
448     if (c->version >= 13) {
449
450         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
451             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
452                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
453                 pa_tagstruct_get_usec(t, &usec) < 0) {
454                 pa_context_fail(c, PA_ERR_PROTOCOL);
455                 goto finish;
456             }
457         } else {
458             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
459                 pa_tagstruct_getu32(t, &tlength) < 0 ||
460                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
461                 pa_tagstruct_getu32(t, &minreq) < 0 ||
462                 pa_tagstruct_get_usec(t, &usec) < 0) {
463                 pa_context_fail(c, PA_ERR_PROTOCOL);
464                 goto finish;
465             }
466         }
467     }
468
469     if (!pa_tagstruct_eof(t)) {
470         pa_context_fail(c, PA_ERR_PROTOCOL);
471         goto finish;
472     }
473
474     if (!dn || di == PA_INVALID_INDEX) {
475         pa_context_fail(c, PA_ERR_PROTOCOL);
476         goto finish;
477     }
478
479     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
480         goto finish;
481
482     if (s->state != PA_STREAM_READY)
483         goto finish;
484
485     if (c->version >= 13) {
486         if (s->direction == PA_STREAM_RECORD)
487             s->timing_info.configured_source_usec = usec;
488         else
489             s->timing_info.configured_sink_usec = usec;
490
491         s->buffer_attr.maxlength = maxlength;
492         s->buffer_attr.fragsize = fragsize;
493         s->buffer_attr.tlength = tlength;
494         s->buffer_attr.prebuf = prebuf;
495         s->buffer_attr.minreq = minreq;
496     }
497
498     pa_xfree(s->device_name);
499     s->device_name = pa_xstrdup(dn);
500     s->device_index = di;
501
502     s->suspended = suspended;
503
504     check_smoother_status(s, TRUE, FALSE, FALSE);
505     request_auto_timing_update(s, TRUE);
506
507     if (s->moved_callback)
508         s->moved_callback(s, s->moved_userdata);
509
510 finish:
511     pa_context_unref(c);
512 }
513
514 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515     pa_context *c = userdata;
516     pa_stream *s;
517     uint32_t channel;
518     pa_usec_t usec = 0;
519     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
520
521     pa_assert(pd);
522     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
523     pa_assert(t);
524     pa_assert(c);
525     pa_assert(PA_REFCNT_VALUE(c) >= 1);
526
527     pa_context_ref(c);
528
529     if (c->version < 15) {
530         pa_context_fail(c, PA_ERR_PROTOCOL);
531         goto finish;
532     }
533
534     if (pa_tagstruct_getu32(t, &channel) < 0) {
535         pa_context_fail(c, PA_ERR_PROTOCOL);
536         goto finish;
537     }
538
539     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
540         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
541             pa_tagstruct_getu32(t, &fragsize) < 0 ||
542             pa_tagstruct_get_usec(t, &usec) < 0) {
543             pa_context_fail(c, PA_ERR_PROTOCOL);
544             goto finish;
545         }
546     } else {
547         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
548             pa_tagstruct_getu32(t, &tlength) < 0 ||
549             pa_tagstruct_getu32(t, &prebuf) < 0 ||
550             pa_tagstruct_getu32(t, &minreq) < 0 ||
551             pa_tagstruct_get_usec(t, &usec) < 0) {
552             pa_context_fail(c, PA_ERR_PROTOCOL);
553             goto finish;
554         }
555     }
556
557     if (!pa_tagstruct_eof(t)) {
558         pa_context_fail(c, PA_ERR_PROTOCOL);
559         goto finish;
560     }
561
562     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
563         goto finish;
564
565     if (s->state != PA_STREAM_READY)
566         goto finish;
567
568     if (s->direction == PA_STREAM_RECORD)
569         s->timing_info.configured_source_usec = usec;
570     else
571         s->timing_info.configured_sink_usec = usec;
572
573     s->buffer_attr.maxlength = maxlength;
574     s->buffer_attr.fragsize = fragsize;
575     s->buffer_attr.tlength = tlength;
576     s->buffer_attr.prebuf = prebuf;
577     s->buffer_attr.minreq = minreq;
578
579     request_auto_timing_update(s, TRUE);
580
581     if (s->buffer_attr_callback)
582         s->buffer_attr_callback(s, s->buffer_attr_userdata);
583
584 finish:
585     pa_context_unref(c);
586 }
587
588 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
589     pa_context *c = userdata;
590     pa_stream *s;
591     uint32_t channel;
592     pa_bool_t suspended;
593
594     pa_assert(pd);
595     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
596     pa_assert(t);
597     pa_assert(c);
598     pa_assert(PA_REFCNT_VALUE(c) >= 1);
599
600     pa_context_ref(c);
601
602     if (c->version < 12) {
603         pa_context_fail(c, PA_ERR_PROTOCOL);
604         goto finish;
605     }
606
607     if (pa_tagstruct_getu32(t, &channel) < 0 ||
608         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
609         !pa_tagstruct_eof(t)) {
610         pa_context_fail(c, PA_ERR_PROTOCOL);
611         goto finish;
612     }
613
614     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
615         goto finish;
616
617     if (s->state != PA_STREAM_READY)
618         goto finish;
619
620     s->suspended = suspended;
621
622     check_smoother_status(s, TRUE, FALSE, FALSE);
623     request_auto_timing_update(s, TRUE);
624
625     if (s->suspended_callback)
626         s->suspended_callback(s, s->suspended_userdata);
627
628 finish:
629     pa_context_unref(c);
630 }
631
632 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
633     pa_context *c = userdata;
634     pa_stream *s;
635     uint32_t channel;
636
637     pa_assert(pd);
638     pa_assert(command == PA_COMMAND_STARTED);
639     pa_assert(t);
640     pa_assert(c);
641     pa_assert(PA_REFCNT_VALUE(c) >= 1);
642
643     pa_context_ref(c);
644
645     if (c->version < 13) {
646         pa_context_fail(c, PA_ERR_PROTOCOL);
647         goto finish;
648     }
649
650     if (pa_tagstruct_getu32(t, &channel) < 0 ||
651         !pa_tagstruct_eof(t)) {
652         pa_context_fail(c, PA_ERR_PROTOCOL);
653         goto finish;
654     }
655
656     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
657         goto finish;
658
659     if (s->state != PA_STREAM_READY)
660         goto finish;
661
662     check_smoother_status(s, TRUE, TRUE, FALSE);
663     request_auto_timing_update(s, TRUE);
664
665     if (s->started_callback)
666         s->started_callback(s, s->started_userdata);
667
668 finish:
669     pa_context_unref(c);
670 }
671
672 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673     pa_context *c = userdata;
674     pa_stream *s;
675     uint32_t channel;
676     pa_proplist *pl = NULL;
677     const char *event;
678
679     pa_assert(pd);
680     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
681     pa_assert(t);
682     pa_assert(c);
683     pa_assert(PA_REFCNT_VALUE(c) >= 1);
684
685     pa_context_ref(c);
686
687     if (c->version < 15) {
688         pa_context_fail(c, PA_ERR_PROTOCOL);
689         goto finish;
690     }
691
692     pl = pa_proplist_new();
693
694     if (pa_tagstruct_getu32(t, &channel) < 0 ||
695         pa_tagstruct_gets(t, &event) < 0 ||
696         pa_tagstruct_get_proplist(t, pl) < 0 ||
697         !pa_tagstruct_eof(t) || !event) {
698         pa_context_fail(c, PA_ERR_PROTOCOL);
699         goto finish;
700     }
701
702     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
703         goto finish;
704
705     if (s->state != PA_STREAM_READY)
706         goto finish;
707
708     if (s->event_callback)
709         s->event_callback(s, event, pl, s->event_userdata);
710
711 finish:
712     pa_context_unref(c);
713
714     if (pl)
715         pa_proplist_free(pl);
716 }
717
718 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
719     pa_stream *s;
720     pa_context *c = userdata;
721     uint32_t bytes, channel;
722
723     pa_assert(pd);
724     pa_assert(command == PA_COMMAND_REQUEST);
725     pa_assert(t);
726     pa_assert(c);
727     pa_assert(PA_REFCNT_VALUE(c) >= 1);
728
729     pa_context_ref(c);
730
731     if (pa_tagstruct_getu32(t, &channel) < 0 ||
732         pa_tagstruct_getu32(t, &bytes) < 0 ||
733         !pa_tagstruct_eof(t)) {
734         pa_context_fail(c, PA_ERR_PROTOCOL);
735         goto finish;
736     }
737
738     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
739         goto finish;
740
741     if (s->state != PA_STREAM_READY)
742         goto finish;
743
744     s->requested_bytes += bytes;
745
746     /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
747
748     if (s->requested_bytes > 0 && s->write_callback)
749         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
750
751 finish:
752     pa_context_unref(c);
753 }
754
755 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
756     pa_stream *s;
757     pa_context *c = userdata;
758     uint32_t channel;
759
760     pa_assert(pd);
761     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
762     pa_assert(t);
763     pa_assert(c);
764     pa_assert(PA_REFCNT_VALUE(c) >= 1);
765
766     pa_context_ref(c);
767
768     if (pa_tagstruct_getu32(t, &channel) < 0 ||
769         !pa_tagstruct_eof(t)) {
770         pa_context_fail(c, PA_ERR_PROTOCOL);
771         goto finish;
772     }
773
774     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
775         goto finish;
776
777     if (s->state != PA_STREAM_READY)
778         goto finish;
779
780     if (s->buffer_attr.prebuf > 0)
781         check_smoother_status(s, TRUE, FALSE, TRUE);
782
783     request_auto_timing_update(s, TRUE);
784
785     if (command == PA_COMMAND_OVERFLOW) {
786         if (s->overflow_callback)
787             s->overflow_callback(s, s->overflow_userdata);
788     } else if (command == PA_COMMAND_UNDERFLOW) {
789         if (s->underflow_callback)
790             s->underflow_callback(s, s->underflow_userdata);
791     }
792
793  finish:
794     pa_context_unref(c);
795 }
796
797 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
798     pa_assert(s);
799     pa_assert(PA_REFCNT_VALUE(s) >= 1);
800
801 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
802
803     if (s->state != PA_STREAM_READY)
804         return;
805
806     if (w) {
807         s->write_index_not_before = s->context->ctag;
808
809         if (s->timing_info_valid)
810             s->timing_info.write_index_corrupt = TRUE;
811
812 /*         pa_log("write_index invalidated"); */
813     }
814
815     if (r) {
816         s->read_index_not_before = s->context->ctag;
817
818         if (s->timing_info_valid)
819             s->timing_info.read_index_corrupt = TRUE;
820
821 /*         pa_log("read_index invalidated"); */
822     }
823
824     request_auto_timing_update(s, TRUE);
825 }
826
827 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
828     pa_stream *s = userdata;
829
830     pa_assert(s);
831     pa_assert(PA_REFCNT_VALUE(s) >= 1);
832
833     pa_stream_ref(s);
834     request_auto_timing_update(s, FALSE);
835     pa_stream_unref(s);
836 }
837
838 static void create_stream_complete(pa_stream *s) {
839     pa_assert(s);
840     pa_assert(PA_REFCNT_VALUE(s) >= 1);
841     pa_assert(s->state == PA_STREAM_CREATING);
842
843     pa_stream_set_state(s, PA_STREAM_READY);
844
845     if (s->requested_bytes > 0 && s->write_callback)
846         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
847
848     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
849         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
850         pa_assert(!s->auto_timing_update_event);
851         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
852
853         request_auto_timing_update(s, TRUE);
854     }
855
856     check_smoother_status(s, TRUE, FALSE, FALSE);
857 }
858
859 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
860     const char *e;
861
862     pa_assert(s);
863     pa_assert(attr);
864
865     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
866         uint32_t ms;
867
868         if (pa_atou(e, &ms) < 0 || ms <= 0)
869             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
870         else {
871             attr->maxlength = (uint32_t) -1;
872             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
873             attr->minreq = (uint32_t) -1;
874             attr->prebuf = (uint32_t) -1;
875             attr->fragsize = attr->tlength;
876         }
877
878         if (flags)
879             *flags |= PA_STREAM_ADJUST_LATENCY;
880     }
881
882     if (s->context->version >= 13)
883         return;
884
885     /* Version older than 0.9.10 didn't do server side buffer_attr
886      * selection, hence we have to fake it on the client side. */
887
888     /* We choose fairly conservative values here, to not confuse
889      * old clients with extremely large playback buffers */
890
891     if (attr->maxlength == (uint32_t) -1)
892         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
893
894     if (attr->tlength == (uint32_t) -1)
895         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
896
897     if (attr->minreq == (uint32_t) -1)
898         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
899
900     if (attr->prebuf == (uint32_t) -1)
901         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
902
903     if (attr->fragsize == (uint32_t) -1)
904         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
905 }
906
907 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
908     pa_stream *s = userdata;
909     uint32_t requested_bytes = 0;
910
911     pa_assert(pd);
912     pa_assert(s);
913     pa_assert(PA_REFCNT_VALUE(s) >= 1);
914     pa_assert(s->state == PA_STREAM_CREATING);
915
916     pa_stream_ref(s);
917
918     if (command != PA_COMMAND_REPLY) {
919         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
920             goto finish;
921
922         pa_stream_set_state(s, PA_STREAM_FAILED);
923         goto finish;
924     }
925
926     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
927         s->channel == PA_INVALID_INDEX ||
928         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
929         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
930         pa_context_fail(s->context, PA_ERR_PROTOCOL);
931         goto finish;
932     }
933
934     s->requested_bytes = (int64_t) requested_bytes;
935
936     if (s->context->version >= 9) {
937         if (s->direction == PA_STREAM_PLAYBACK) {
938             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
939                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
940                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
941                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
942                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
943                 goto finish;
944             }
945         } else if (s->direction == PA_STREAM_RECORD) {
946             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
947                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
948                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
949                 goto finish;
950             }
951         }
952     }
953
954     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
955         pa_sample_spec ss;
956         pa_channel_map cm;
957         const char *dn = NULL;
958         pa_bool_t suspended;
959
960         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
961             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
962             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
963             pa_tagstruct_gets(t, &dn) < 0 ||
964             pa_tagstruct_get_boolean(t, &suspended) < 0) {
965             pa_context_fail(s->context, PA_ERR_PROTOCOL);
966             goto finish;
967         }
968
969         if (!dn || s->device_index == PA_INVALID_INDEX ||
970             ss.channels != cm.channels ||
971             !pa_channel_map_valid(&cm) ||
972             !pa_sample_spec_valid(&ss) ||
973             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
974             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
975             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
976             pa_context_fail(s->context, PA_ERR_PROTOCOL);
977             goto finish;
978         }
979
980         pa_xfree(s->device_name);
981         s->device_name = pa_xstrdup(dn);
982         s->suspended = suspended;
983
984         s->channel_map = cm;
985         s->sample_spec = ss;
986     }
987
988     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
989         pa_usec_t usec;
990
991         if (pa_tagstruct_get_usec(t, &usec) < 0) {
992             pa_context_fail(s->context, PA_ERR_PROTOCOL);
993             goto finish;
994         }
995
996         if (s->direction == PA_STREAM_RECORD)
997             s->timing_info.configured_source_usec = usec;
998         else
999             s->timing_info.configured_sink_usec = usec;
1000     }
1001
1002     if (!pa_tagstruct_eof(t)) {
1003         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1004         goto finish;
1005     }
1006
1007     if (s->direction == PA_STREAM_RECORD) {
1008         pa_assert(!s->record_memblockq);
1009
1010         s->record_memblockq = pa_memblockq_new(
1011                 0,
1012                 s->buffer_attr.maxlength,
1013                 0,
1014                 pa_frame_size(&s->sample_spec),
1015                 1,
1016                 0,
1017                 0,
1018                 NULL);
1019     }
1020
1021     s->channel_valid = TRUE;
1022     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1023
1024     create_stream_complete(s);
1025
1026 finish:
1027     pa_stream_unref(s);
1028 }
1029
1030 static int create_stream(
1031         pa_stream_direction_t direction,
1032         pa_stream *s,
1033         const char *dev,
1034         const pa_buffer_attr *attr,
1035         pa_stream_flags_t flags,
1036         const pa_cvolume *volume,
1037         pa_stream *sync_stream) {
1038
1039     pa_tagstruct *t;
1040     uint32_t tag;
1041     pa_bool_t volume_set = FALSE;
1042
1043     pa_assert(s);
1044     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1045     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1046
1047     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1048     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1049     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1050     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1051                                               PA_STREAM_INTERPOLATE_TIMING|
1052                                               PA_STREAM_NOT_MONOTONIC|
1053                                               PA_STREAM_AUTO_TIMING_UPDATE|
1054                                               PA_STREAM_NO_REMAP_CHANNELS|
1055                                               PA_STREAM_NO_REMIX_CHANNELS|
1056                                               PA_STREAM_FIX_FORMAT|
1057                                               PA_STREAM_FIX_RATE|
1058                                               PA_STREAM_FIX_CHANNELS|
1059                                               PA_STREAM_DONT_MOVE|
1060                                               PA_STREAM_VARIABLE_RATE|
1061                                               PA_STREAM_PEAK_DETECT|
1062                                               PA_STREAM_START_MUTED|
1063                                               PA_STREAM_ADJUST_LATENCY|
1064                                               PA_STREAM_EARLY_REQUESTS|
1065                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1066                                               PA_STREAM_START_UNMUTED|
1067                                               PA_STREAM_FAIL_ON_SUSPEND|
1068                                               PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1069
1070     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1071     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1072     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1073     /* Althought some of the other flags are not supported on older
1074      * version, we don't check for them here, because it doesn't hurt
1075      * when they are passed but actually not supported. This makes
1076      * client development easier */
1077
1078     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1079     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1080     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1081     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1082     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1083
1084     pa_stream_ref(s);
1085
1086     s->direction = direction;
1087
1088     if (sync_stream)
1089         s->syncid = sync_stream->syncid;
1090
1091     if (attr)
1092         s->buffer_attr = *attr;
1093     patch_buffer_attr(s, &s->buffer_attr, &flags);
1094
1095     s->flags = flags;
1096     s->corked = !!(flags & PA_STREAM_START_CORKED);
1097
1098     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1099         pa_usec_t x;
1100
1101         x = pa_rtclock_now();
1102
1103         pa_assert(!s->smoother);
1104         s->smoother = pa_smoother_new(
1105                 SMOOTHER_ADJUST_TIME,
1106                 SMOOTHER_HISTORY_TIME,
1107                 !(flags & PA_STREAM_NOT_MONOTONIC),
1108                 TRUE,
1109                 SMOOTHER_MIN_HISTORY,
1110                 x,
1111                 TRUE);
1112     }
1113
1114     if (!dev)
1115         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1116
1117     t = pa_tagstruct_command(
1118             s->context,
1119             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1120             &tag);
1121
1122     if (s->context->version < 13)
1123         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1124
1125     pa_tagstruct_put(
1126             t,
1127             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1128             PA_TAG_CHANNEL_MAP, &s->channel_map,
1129             PA_TAG_U32, PA_INVALID_INDEX,
1130             PA_TAG_STRING, dev,
1131             PA_TAG_U32, s->buffer_attr.maxlength,
1132             PA_TAG_BOOLEAN, s->corked,
1133             PA_TAG_INVALID);
1134
1135     if (s->direction == PA_STREAM_PLAYBACK) {
1136         pa_cvolume cv;
1137
1138         pa_tagstruct_put(
1139                 t,
1140                 PA_TAG_U32, s->buffer_attr.tlength,
1141                 PA_TAG_U32, s->buffer_attr.prebuf,
1142                 PA_TAG_U32, s->buffer_attr.minreq,
1143                 PA_TAG_U32, s->syncid,
1144                 PA_TAG_INVALID);
1145
1146         volume_set = !!volume;
1147
1148         if (!volume)
1149             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1150
1151         pa_tagstruct_put_cvolume(t, volume);
1152     } else
1153         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1154
1155     if (s->context->version >= 12) {
1156         pa_tagstruct_put(
1157                 t,
1158                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1159                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1160                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1161                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1162                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1163                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1164                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1165                 PA_TAG_INVALID);
1166     }
1167
1168     if (s->context->version >= 13) {
1169
1170         if (s->direction == PA_STREAM_PLAYBACK)
1171             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1172         else
1173             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1174
1175         pa_tagstruct_put(
1176                 t,
1177                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1178                 PA_TAG_PROPLIST, s->proplist,
1179                 PA_TAG_INVALID);
1180
1181         if (s->direction == PA_STREAM_RECORD)
1182             pa_tagstruct_putu32(t, s->direct_on_input);
1183     }
1184
1185     if (s->context->version >= 14) {
1186
1187         if (s->direction == PA_STREAM_PLAYBACK)
1188             pa_tagstruct_put_boolean(t, volume_set);
1189
1190         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1191     }
1192
1193     if (s->context->version >= 15) {
1194
1195         if (s->direction == PA_STREAM_PLAYBACK)
1196             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1197
1198         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1199         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1200     }
1201
1202     if (s->context->version >= 17) {
1203
1204         if (s->direction == PA_STREAM_PLAYBACK)
1205             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1206
1207     }
1208
1209     pa_pstream_send_tagstruct(s->context->pstream, t);
1210     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1211
1212     pa_stream_set_state(s, PA_STREAM_CREATING);
1213
1214     pa_stream_unref(s);
1215     return 0;
1216 }
1217
1218 int pa_stream_connect_playback(
1219         pa_stream *s,
1220         const char *dev,
1221         const pa_buffer_attr *attr,
1222         pa_stream_flags_t flags,
1223         const pa_cvolume *volume,
1224         pa_stream *sync_stream) {
1225
1226     pa_assert(s);
1227     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1228
1229     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1230 }
1231
1232 int pa_stream_connect_record(
1233         pa_stream *s,
1234         const char *dev,
1235         const pa_buffer_attr *attr,
1236         pa_stream_flags_t flags) {
1237
1238     pa_assert(s);
1239     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1240
1241     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1242 }
1243
1244 int pa_stream_begin_write(
1245         pa_stream *s,
1246         void **data,
1247         size_t *nbytes) {
1248
1249     pa_assert(s);
1250     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1251
1252     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1253     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1254     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1255     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1256     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1257
1258     if (*nbytes != (size_t) -1) {
1259         size_t m, fs;
1260
1261         m = pa_mempool_block_size_max(s->context->mempool);
1262         fs = pa_frame_size(&s->sample_spec);
1263
1264         m = (m / fs) * fs;
1265         if (*nbytes > m)
1266             *nbytes = m;
1267     }
1268
1269     if (!s->write_memblock) {
1270         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1271         s->write_data = pa_memblock_acquire(s->write_memblock);
1272     }
1273
1274     *data = s->write_data;
1275     *nbytes = pa_memblock_get_length(s->write_memblock);
1276
1277     return 0;
1278 }
1279
1280 int pa_stream_cancel_write(
1281         pa_stream *s) {
1282
1283     pa_assert(s);
1284     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1285
1286     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1287     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1288     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1289     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1290
1291     pa_assert(s->write_data);
1292
1293     pa_memblock_release(s->write_memblock);
1294     pa_memblock_unref(s->write_memblock);
1295     s->write_memblock = NULL;
1296     s->write_data = NULL;
1297
1298     return 0;
1299 }
1300
1301 int pa_stream_write(
1302         pa_stream *s,
1303         const void *data,
1304         size_t length,
1305         pa_free_cb_t free_cb,
1306         int64_t offset,
1307         pa_seek_mode_t seek) {
1308
1309     pa_assert(s);
1310     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1311     pa_assert(data);
1312
1313     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1314     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1315     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1316     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1317     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1318     PA_CHECK_VALIDITY(s->context,
1319                       !s->write_memblock ||
1320                       ((data >= s->write_data) &&
1321                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1322                       PA_ERR_INVALID);
1323     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1324
1325     if (s->write_memblock) {
1326         pa_memchunk chunk;
1327
1328         /* pa_stream_write_begin() was called before */
1329
1330         pa_memblock_release(s->write_memblock);
1331
1332         chunk.memblock = s->write_memblock;
1333         chunk.index = (const char *) data - (const char *) s->write_data;
1334         chunk.length = length;
1335
1336         s->write_memblock = NULL;
1337         s->write_data = NULL;
1338
1339         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1340         pa_memblock_unref(chunk.memblock);
1341
1342     } else {
1343         pa_seek_mode_t t_seek = seek;
1344         int64_t t_offset = offset;
1345         size_t t_length = length;
1346         const void *t_data = data;
1347
1348         /* pa_stream_write_begin() was not called before */
1349
1350         while (t_length > 0) {
1351             pa_memchunk chunk;
1352
1353             chunk.index = 0;
1354
1355             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1356                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1357                 chunk.length = t_length;
1358             } else {
1359                 void *d;
1360
1361                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1362                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1363
1364                 d = pa_memblock_acquire(chunk.memblock);
1365                 memcpy(d, t_data, chunk.length);
1366                 pa_memblock_release(chunk.memblock);
1367             }
1368
1369             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1370
1371             t_offset = 0;
1372             t_seek = PA_SEEK_RELATIVE;
1373
1374             t_data = (const uint8_t*) t_data + chunk.length;
1375             t_length -= chunk.length;
1376
1377             pa_memblock_unref(chunk.memblock);
1378         }
1379
1380         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1381             free_cb((void*) data);
1382     }
1383
1384     /* This is obviously wrong since we ignore the seeking index . But
1385      * that's OK, the server side applies the same error */
1386     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1387
1388     /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1389
1390     if (s->direction == PA_STREAM_PLAYBACK) {
1391
1392         /* Update latency request correction */
1393         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1394
1395             if (seek == PA_SEEK_ABSOLUTE) {
1396                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1397                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1398                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1399             } else if (seek == PA_SEEK_RELATIVE) {
1400                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1401                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1402             } else
1403                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1404         }
1405
1406         /* Update the write index in the already available latency data */
1407         if (s->timing_info_valid) {
1408
1409             if (seek == PA_SEEK_ABSOLUTE) {
1410                 s->timing_info.write_index_corrupt = FALSE;
1411                 s->timing_info.write_index = offset + (int64_t) length;
1412             } else if (seek == PA_SEEK_RELATIVE) {
1413                 if (!s->timing_info.write_index_corrupt)
1414                     s->timing_info.write_index += offset + (int64_t) length;
1415             } else
1416                 s->timing_info.write_index_corrupt = TRUE;
1417         }
1418
1419         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1420             request_auto_timing_update(s, TRUE);
1421     }
1422
1423     return 0;
1424 }
1425
1426 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1427     pa_assert(s);
1428     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1429     pa_assert(data);
1430     pa_assert(length);
1431
1432     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1433     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1434     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1435
1436     if (!s->peek_memchunk.memblock) {
1437
1438         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1439             *data = NULL;
1440             *length = 0;
1441             return 0;
1442         }
1443
1444         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1445     }
1446
1447     pa_assert(s->peek_data);
1448     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1449     *length = s->peek_memchunk.length;
1450     return 0;
1451 }
1452
1453 int pa_stream_drop(pa_stream *s) {
1454     pa_assert(s);
1455     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1456
1457     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1458     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1459     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1460     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1461
1462     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1463
1464     /* Fix the simulated local read index */
1465     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1466         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1467
1468     pa_assert(s->peek_data);
1469     pa_memblock_release(s->peek_memchunk.memblock);
1470     pa_memblock_unref(s->peek_memchunk.memblock);
1471     pa_memchunk_reset(&s->peek_memchunk);
1472
1473     return 0;
1474 }
1475
1476 size_t pa_stream_writable_size(pa_stream *s) {
1477     pa_assert(s);
1478     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1479
1480     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1481     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1482     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1483
1484     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1485 }
1486
1487 size_t pa_stream_readable_size(pa_stream *s) {
1488     pa_assert(s);
1489     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1490
1491     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1492     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1493     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1494
1495     return pa_memblockq_get_length(s->record_memblockq);
1496 }
1497
1498 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1499     pa_operation *o;
1500     pa_tagstruct *t;
1501     uint32_t tag;
1502
1503     pa_assert(s);
1504     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1505
1506     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1507     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1508     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1509
1510     /* Ask for a timing update before we cork/uncork to get the best
1511      * accuracy for the transport latency suitable for the
1512      * check_smoother_status() call in the started callback */
1513     request_auto_timing_update(s, TRUE);
1514
1515     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1516
1517     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1518     pa_tagstruct_putu32(t, s->channel);
1519     pa_pstream_send_tagstruct(s->context->pstream, t);
1520     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1521
1522     /* This might cause the read index to conitnue again, hence
1523      * let's request a timing update */
1524     request_auto_timing_update(s, TRUE);
1525
1526     return o;
1527 }
1528
1529 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1530     pa_usec_t usec;
1531
1532     pa_assert(s);
1533     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1534     pa_assert(s->state == PA_STREAM_READY);
1535     pa_assert(s->direction != PA_STREAM_UPLOAD);
1536     pa_assert(s->timing_info_valid);
1537     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1538     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1539
1540     if (s->direction == PA_STREAM_PLAYBACK) {
1541         /* The last byte that was written into the output device
1542          * had this time value associated */
1543         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1544
1545         if (!s->corked && !s->suspended) {
1546
1547             if (!ignore_transport)
1548                 /* Because the latency info took a little time to come
1549                  * to us, we assume that the real output time is actually
1550                  * a little ahead */
1551                 usec += s->timing_info.transport_usec;
1552
1553             /* However, the output device usually maintains a buffer
1554                too, hence the real sample currently played is a little
1555                back  */
1556             if (s->timing_info.sink_usec >= usec)
1557                 usec = 0;
1558             else
1559                 usec -= s->timing_info.sink_usec;
1560         }
1561
1562     } else {
1563         pa_assert(s->direction == PA_STREAM_RECORD);
1564
1565         /* The last byte written into the server side queue had
1566          * this time value associated */
1567         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1568
1569         if (!s->corked && !s->suspended) {
1570
1571             if (!ignore_transport)
1572                 /* Add transport latency */
1573                 usec += s->timing_info.transport_usec;
1574
1575             /* Add latency of data in device buffer */
1576             usec += s->timing_info.source_usec;
1577
1578             /* If this is a monitor source, we need to correct the
1579              * time by the playback device buffer */
1580             if (s->timing_info.sink_usec >= usec)
1581                 usec = 0;
1582             else
1583                 usec -= s->timing_info.sink_usec;
1584         }
1585     }
1586
1587     return usec;
1588 }
1589
1590 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1591     pa_operation *o = userdata;
1592     struct timeval local, remote, now;
1593     pa_timing_info *i;
1594     pa_bool_t playing = FALSE;
1595     uint64_t underrun_for = 0, playing_for = 0;
1596
1597     pa_assert(pd);
1598     pa_assert(o);
1599     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1600
1601     if (!o->context || !o->stream)
1602         goto finish;
1603
1604     i = &o->stream->timing_info;
1605
1606     o->stream->timing_info_valid = FALSE;
1607     i->write_index_corrupt = TRUE;
1608     i->read_index_corrupt = TRUE;
1609
1610     if (command != PA_COMMAND_REPLY) {
1611         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1612             goto finish;
1613
1614     } else {
1615
1616         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1617             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1618             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1619             pa_tagstruct_get_timeval(t, &local) < 0 ||
1620             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1621             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1622             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1623
1624             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1625             goto finish;
1626         }
1627
1628         if (o->context->version >= 13 &&
1629             o->stream->direction == PA_STREAM_PLAYBACK)
1630             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1631                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1632
1633                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1634                 goto finish;
1635             }
1636
1637
1638         if (!pa_tagstruct_eof(t)) {
1639             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1640             goto finish;
1641         }
1642         o->stream->timing_info_valid = TRUE;
1643         i->write_index_corrupt = FALSE;
1644         i->read_index_corrupt = FALSE;
1645
1646         i->playing = (int) playing;
1647         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1648
1649         pa_gettimeofday(&now);
1650
1651         /* Calculcate timestamps */
1652         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1653             /* local and remote seem to have synchronized clocks */
1654
1655             if (o->stream->direction == PA_STREAM_PLAYBACK)
1656                 i->transport_usec = pa_timeval_diff(&remote, &local);
1657             else
1658                 i->transport_usec = pa_timeval_diff(&now, &remote);
1659
1660             i->synchronized_clocks = TRUE;
1661             i->timestamp = remote;
1662         } else {
1663             /* clocks are not synchronized, let's estimate latency then */
1664             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1665             i->synchronized_clocks = FALSE;
1666             i->timestamp = local;
1667             pa_timeval_add(&i->timestamp, i->transport_usec);
1668         }
1669
1670         /* Invalidate read and write indexes if necessary */
1671         if (tag < o->stream->read_index_not_before)
1672             i->read_index_corrupt = TRUE;
1673
1674         if (tag < o->stream->write_index_not_before)
1675             i->write_index_corrupt = TRUE;
1676
1677         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1678             /* Write index correction */
1679
1680             int n, j;
1681             uint32_t ctag = tag;
1682
1683             /* Go through the saved correction values and add up the
1684              * total correction.*/
1685             for (n = 0, j = o->stream->current_write_index_correction+1;
1686                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1687                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1688
1689                 /* Step over invalid data or out-of-date data */
1690                 if (!o->stream->write_index_corrections[j].valid ||
1691                     o->stream->write_index_corrections[j].tag < ctag)
1692                     continue;
1693
1694                 /* Make sure that everything is in order */
1695                 ctag = o->stream->write_index_corrections[j].tag+1;
1696
1697                 /* Now fix the write index */
1698                 if (o->stream->write_index_corrections[j].corrupt) {
1699                     /* A corrupting seek was made */
1700                     i->write_index_corrupt = TRUE;
1701                 } else if (o->stream->write_index_corrections[j].absolute) {
1702                     /* An absolute seek was made */
1703                     i->write_index = o->stream->write_index_corrections[j].value;
1704                     i->write_index_corrupt = FALSE;
1705                 } else if (!i->write_index_corrupt) {
1706                     /* A relative seek was made */
1707                     i->write_index += o->stream->write_index_corrections[j].value;
1708                 }
1709             }
1710
1711             /* Clear old correction entries */
1712             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1713                 if (!o->stream->write_index_corrections[n].valid)
1714                     continue;
1715
1716                 if (o->stream->write_index_corrections[n].tag <= tag)
1717                     o->stream->write_index_corrections[n].valid = FALSE;
1718             }
1719         }
1720
1721         if (o->stream->direction == PA_STREAM_RECORD) {
1722             /* Read index correction */
1723
1724             if (!i->read_index_corrupt)
1725                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1726         }
1727
1728         /* Update smoother */
1729         if (o->stream->smoother) {
1730             pa_usec_t u, x;
1731
1732             u = x = pa_rtclock_now() - i->transport_usec;
1733
1734             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1735                 pa_usec_t su;
1736
1737                 /* If we weren't playing then it will take some time
1738                  * until the audio will actually come out through the
1739                  * speakers. Since we follow that timing here, we need
1740                  * to try to fix this up */
1741
1742                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1743
1744                 if (su < i->sink_usec)
1745                     x += i->sink_usec - su;
1746             }
1747
1748             if (!i->playing)
1749                 pa_smoother_pause(o->stream->smoother, x);
1750
1751             /* Update the smoother */
1752             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1753                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1754                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1755
1756             if (i->playing)
1757                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1758         }
1759     }
1760
1761     o->stream->auto_timing_update_requested = FALSE;
1762
1763     if (o->stream->latency_update_callback)
1764         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1765
1766     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1767         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1768         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1769     }
1770
1771 finish:
1772
1773     pa_operation_done(o);
1774     pa_operation_unref(o);
1775 }
1776
1777 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1778     uint32_t tag;
1779     pa_operation *o;
1780     pa_tagstruct *t;
1781     struct timeval now;
1782     int cidx = 0;
1783
1784     pa_assert(s);
1785     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1786
1787     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1788     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1789     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1790
1791     if (s->direction == PA_STREAM_PLAYBACK) {
1792         /* Find a place to store the write_index correction data for this entry */
1793         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1794
1795         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1796         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1797     }
1798     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1799
1800     t = pa_tagstruct_command(
1801             s->context,
1802             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1803             &tag);
1804     pa_tagstruct_putu32(t, s->channel);
1805     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1806
1807     pa_pstream_send_tagstruct(s->context->pstream, t);
1808     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1809
1810     if (s->direction == PA_STREAM_PLAYBACK) {
1811         /* Fill in initial correction data */
1812
1813         s->current_write_index_correction = cidx;
1814
1815         s->write_index_corrections[cidx].valid = TRUE;
1816         s->write_index_corrections[cidx].absolute = FALSE;
1817         s->write_index_corrections[cidx].corrupt = FALSE;
1818         s->write_index_corrections[cidx].tag = tag;
1819         s->write_index_corrections[cidx].value = 0;
1820     }
1821
1822     return o;
1823 }
1824
1825 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1826     pa_stream *s = userdata;
1827
1828     pa_assert(pd);
1829     pa_assert(s);
1830     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1831
1832     pa_stream_ref(s);
1833
1834     if (command != PA_COMMAND_REPLY) {
1835         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1836             goto finish;
1837
1838         pa_stream_set_state(s, PA_STREAM_FAILED);
1839         goto finish;
1840     } else if (!pa_tagstruct_eof(t)) {
1841         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1842         goto finish;
1843     }
1844
1845     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1846
1847 finish:
1848     pa_stream_unref(s);
1849 }
1850
1851 int pa_stream_disconnect(pa_stream *s) {
1852     pa_tagstruct *t;
1853     uint32_t tag;
1854
1855     pa_assert(s);
1856     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1857
1858     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1859     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1860     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1861
1862     pa_stream_ref(s);
1863
1864     t = pa_tagstruct_command(
1865             s->context,
1866             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1867                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1868             &tag);
1869     pa_tagstruct_putu32(t, s->channel);
1870     pa_pstream_send_tagstruct(s->context->pstream, t);
1871     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1872
1873     pa_stream_unref(s);
1874     return 0;
1875 }
1876
1877 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1878     pa_assert(s);
1879     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1880
1881     if (pa_detect_fork())
1882         return;
1883
1884     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1885         return;
1886
1887     s->read_callback = cb;
1888     s->read_userdata = userdata;
1889 }
1890
1891 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1892     pa_assert(s);
1893     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1894
1895     if (pa_detect_fork())
1896         return;
1897
1898     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1899         return;
1900
1901     s->write_callback = cb;
1902     s->write_userdata = userdata;
1903 }
1904
1905 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1906     pa_assert(s);
1907     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1908
1909     if (pa_detect_fork())
1910         return;
1911
1912     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1913         return;
1914
1915     s->state_callback = cb;
1916     s->state_userdata = userdata;
1917 }
1918
1919 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1920     pa_assert(s);
1921     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1922
1923     if (pa_detect_fork())
1924         return;
1925
1926     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1927         return;
1928
1929     s->overflow_callback = cb;
1930     s->overflow_userdata = userdata;
1931 }
1932
1933 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1934     pa_assert(s);
1935     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1936
1937     if (pa_detect_fork())
1938         return;
1939
1940     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1941         return;
1942
1943     s->underflow_callback = cb;
1944     s->underflow_userdata = userdata;
1945 }
1946
1947 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1948     pa_assert(s);
1949     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1950
1951     if (pa_detect_fork())
1952         return;
1953
1954     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1955         return;
1956
1957     s->latency_update_callback = cb;
1958     s->latency_update_userdata = userdata;
1959 }
1960
1961 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1962     pa_assert(s);
1963     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1964
1965     if (pa_detect_fork())
1966         return;
1967
1968     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1969         return;
1970
1971     s->moved_callback = cb;
1972     s->moved_userdata = userdata;
1973 }
1974
1975 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1976     pa_assert(s);
1977     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1978
1979     if (pa_detect_fork())
1980         return;
1981
1982     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1983         return;
1984
1985     s->suspended_callback = cb;
1986     s->suspended_userdata = userdata;
1987 }
1988
1989 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1990     pa_assert(s);
1991     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1992
1993     if (pa_detect_fork())
1994         return;
1995
1996     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1997         return;
1998
1999     s->started_callback = cb;
2000     s->started_userdata = userdata;
2001 }
2002
2003 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2004     pa_assert(s);
2005     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2006
2007     if (pa_detect_fork())
2008         return;
2009
2010     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2011         return;
2012
2013     s->event_callback = cb;
2014     s->event_userdata = userdata;
2015 }
2016
2017 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2018     pa_assert(s);
2019     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2020
2021     if (pa_detect_fork())
2022         return;
2023
2024     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2025         return;
2026
2027     s->buffer_attr_callback = cb;
2028     s->buffer_attr_userdata = userdata;
2029 }
2030
2031 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2032     pa_operation *o = userdata;
2033     int success = 1;
2034
2035     pa_assert(pd);
2036     pa_assert(o);
2037     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2038
2039     if (!o->context)
2040         goto finish;
2041
2042     if (command != PA_COMMAND_REPLY) {
2043         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2044             goto finish;
2045
2046         success = 0;
2047     } else if (!pa_tagstruct_eof(t)) {
2048         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2049         goto finish;
2050     }
2051
2052     if (o->callback) {
2053         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2054         cb(o->stream, success, o->userdata);
2055     }
2056
2057 finish:
2058     pa_operation_done(o);
2059     pa_operation_unref(o);
2060 }
2061
2062 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2063     pa_operation *o;
2064     pa_tagstruct *t;
2065     uint32_t tag;
2066
2067     pa_assert(s);
2068     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2069
2070     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2071     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2072     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2073
2074     /* Ask for a timing update before we cork/uncork to get the best
2075      * accuracy for the transport latency suitable for the
2076      * check_smoother_status() call in the started callback */
2077     request_auto_timing_update(s, TRUE);
2078
2079     s->corked = b;
2080
2081     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2082
2083     t = pa_tagstruct_command(
2084             s->context,
2085             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2086             &tag);
2087     pa_tagstruct_putu32(t, s->channel);
2088     pa_tagstruct_put_boolean(t, !!b);
2089     pa_pstream_send_tagstruct(s->context->pstream, t);
2090     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2091
2092     check_smoother_status(s, FALSE, FALSE, FALSE);
2093
2094     /* This might cause the indexes to hang/start again, hence let's
2095      * request a timing update, after the cork/uncork, too */
2096     request_auto_timing_update(s, TRUE);
2097
2098     return o;
2099 }
2100
2101 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2102     pa_tagstruct *t;
2103     pa_operation *o;
2104     uint32_t tag;
2105
2106     pa_assert(s);
2107     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2108
2109     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2110     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2111
2112     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2113
2114     t = pa_tagstruct_command(s->context, command, &tag);
2115     pa_tagstruct_putu32(t, s->channel);
2116     pa_pstream_send_tagstruct(s->context->pstream, t);
2117     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2118
2119     return o;
2120 }
2121
2122 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2123     pa_operation *o;
2124
2125     pa_assert(s);
2126     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2127
2128     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2129     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2130     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2131
2132     /* Ask for a timing update *before* the flush, so that the
2133      * transport usec is as up to date as possible when we get the
2134      * underflow message and update the smoother status*/
2135     request_auto_timing_update(s, TRUE);
2136
2137     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2138         return NULL;
2139
2140     if (s->direction == PA_STREAM_PLAYBACK) {
2141
2142         if (s->write_index_corrections[s->current_write_index_correction].valid)
2143             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2144
2145         if (s->buffer_attr.prebuf > 0)
2146             check_smoother_status(s, FALSE, FALSE, TRUE);
2147
2148         /* This will change the write index, but leave the
2149          * read index untouched. */
2150         invalidate_indexes(s, FALSE, TRUE);
2151
2152     } else
2153         /* For record streams this has no influence on the write
2154          * index, but the read index might jump. */
2155         invalidate_indexes(s, TRUE, FALSE);
2156
2157     /* Note that we do not update requested_bytes here. This is
2158      * because we cannot really know how data actually was dropped
2159      * from the write index due to this. This 'error' will be applied
2160      * by both client and server and hence we should be fine. */
2161
2162     return o;
2163 }
2164
2165 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2166     pa_operation *o;
2167
2168     pa_assert(s);
2169     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2170
2171     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2172     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2173     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2174     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2175
2176     /* Ask for a timing update before we cork/uncork to get the best
2177      * accuracy for the transport latency suitable for the
2178      * check_smoother_status() call in the started callback */
2179     request_auto_timing_update(s, TRUE);
2180
2181     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2182         return NULL;
2183
2184     /* This might cause the read index to hang again, hence
2185      * let's request a timing update */
2186     request_auto_timing_update(s, TRUE);
2187
2188     return o;
2189 }
2190
2191 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2192     pa_operation *o;
2193
2194     pa_assert(s);
2195     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2196
2197     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2198     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2199     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2200     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2201
2202     /* Ask for a timing update before we cork/uncork to get the best
2203      * accuracy for the transport latency suitable for the
2204      * check_smoother_status() call in the started callback */
2205     request_auto_timing_update(s, TRUE);
2206
2207     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2208         return NULL;
2209
2210     /* This might cause the read index to start moving again, hence
2211      * let's request a timing update */
2212     request_auto_timing_update(s, TRUE);
2213
2214     return o;
2215 }
2216
2217 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2218     pa_operation *o;
2219
2220     pa_assert(s);
2221     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2222     pa_assert(name);
2223
2224     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2225     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2226     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2227
2228     if (s->context->version >= 13) {
2229         pa_proplist *p = pa_proplist_new();
2230
2231         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2232         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2233         pa_proplist_free(p);
2234     } else {
2235         pa_tagstruct *t;
2236         uint32_t tag;
2237
2238         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2239         t = pa_tagstruct_command(
2240                 s->context,
2241                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2242                 &tag);
2243         pa_tagstruct_putu32(t, s->channel);
2244         pa_tagstruct_puts(t, name);
2245         pa_pstream_send_tagstruct(s->context->pstream, t);
2246         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2247     }
2248
2249     return o;
2250 }
2251
2252 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2253     pa_usec_t usec;
2254
2255     pa_assert(s);
2256     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2257
2258     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2259     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2260     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2261     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2262     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2263     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2264
2265     if (s->smoother)
2266         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2267     else
2268         usec = calc_time(s, FALSE);
2269
2270     /* Make sure the time runs monotonically */
2271     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2272         if (usec < s->previous_time)
2273             usec = s->previous_time;
2274         else
2275             s->previous_time = usec;
2276     }
2277
2278     if (r_usec)
2279         *r_usec = usec;
2280
2281     return 0;
2282 }
2283
2284 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2285     pa_assert(s);
2286     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2287
2288     if (negative)
2289         *negative = 0;
2290
2291     if (a >= b)
2292         return a-b;
2293     else {
2294         if (negative && s->direction == PA_STREAM_RECORD) {
2295             *negative = 1;
2296             return b-a;
2297         } else
2298             return 0;
2299     }
2300 }
2301
2302 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2303     pa_usec_t t, c;
2304     int r;
2305     int64_t cindex;
2306
2307     pa_assert(s);
2308     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2309     pa_assert(r_usec);
2310
2311     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2312     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2313     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2314     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2315     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2316     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2317
2318     if ((r = pa_stream_get_time(s, &t)) < 0)
2319         return r;
2320
2321     if (s->direction == PA_STREAM_PLAYBACK)
2322         cindex = s->timing_info.write_index;
2323     else
2324         cindex = s->timing_info.read_index;
2325
2326     if (cindex < 0)
2327         cindex = 0;
2328
2329     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2330
2331     if (s->direction == PA_STREAM_PLAYBACK)
2332         *r_usec = time_counter_diff(s, c, t, negative);
2333     else
2334         *r_usec = time_counter_diff(s, t, c, negative);
2335
2336     return 0;
2337 }
2338
2339 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2340     pa_assert(s);
2341     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2342
2343     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2344     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2345     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2346     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2347
2348     return &s->timing_info;
2349 }
2350
2351 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2352     pa_assert(s);
2353     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2354
2355     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2356
2357     return &s->sample_spec;
2358 }
2359
2360 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2361     pa_assert(s);
2362     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2363
2364     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2365
2366     return &s->channel_map;
2367 }
2368
2369 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2370     pa_assert(s);
2371     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2372
2373     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2374     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2375     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2376
2377     return &s->buffer_attr;
2378 }
2379
2380 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2381     pa_operation *o = userdata;
2382     int success = 1;
2383
2384     pa_assert(pd);
2385     pa_assert(o);
2386     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2387
2388     if (!o->context)
2389         goto finish;
2390
2391     if (command != PA_COMMAND_REPLY) {
2392         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2393             goto finish;
2394
2395         success = 0;
2396     } else {
2397         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2398             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2399                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2400                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2401                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2402                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2403                 goto finish;
2404             }
2405         } else if (o->stream->direction == PA_STREAM_RECORD) {
2406             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2407                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2408                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2409                 goto finish;
2410             }
2411         }
2412
2413         if (o->stream->context->version >= 13) {
2414             pa_usec_t usec;
2415
2416             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2417                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2418                 goto finish;
2419             }
2420
2421             if (o->stream->direction == PA_STREAM_RECORD)
2422                 o->stream->timing_info.configured_source_usec = usec;
2423             else
2424                 o->stream->timing_info.configured_sink_usec = usec;
2425         }
2426
2427         if (!pa_tagstruct_eof(t)) {
2428             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2429             goto finish;
2430         }
2431     }
2432
2433     if (o->callback) {
2434         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2435         cb(o->stream, success, o->userdata);
2436     }
2437
2438 finish:
2439     pa_operation_done(o);
2440     pa_operation_unref(o);
2441 }
2442
2443
2444 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2445     pa_operation *o;
2446     pa_tagstruct *t;
2447     uint32_t tag;
2448     pa_buffer_attr copy;
2449
2450     pa_assert(s);
2451     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2452     pa_assert(attr);
2453
2454     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2455     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2456     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2457     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2458
2459     /* Ask for a timing update before we cork/uncork to get the best
2460      * accuracy for the transport latency suitable for the
2461      * check_smoother_status() call in the started callback */
2462     request_auto_timing_update(s, TRUE);
2463
2464     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2465
2466     t = pa_tagstruct_command(
2467             s->context,
2468             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2469             &tag);
2470     pa_tagstruct_putu32(t, s->channel);
2471
2472     copy = *attr;
2473     patch_buffer_attr(s, &copy, NULL);
2474     attr = &copy;
2475
2476     pa_tagstruct_putu32(t, attr->maxlength);
2477
2478     if (s->direction == PA_STREAM_PLAYBACK)
2479         pa_tagstruct_put(
2480                 t,
2481                 PA_TAG_U32, attr->tlength,
2482                 PA_TAG_U32, attr->prebuf,
2483                 PA_TAG_U32, attr->minreq,
2484                 PA_TAG_INVALID);
2485     else
2486         pa_tagstruct_putu32(t, attr->fragsize);
2487
2488     if (s->context->version >= 13)
2489         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2490
2491     if (s->context->version >= 14)
2492         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2493
2494     pa_pstream_send_tagstruct(s->context->pstream, t);
2495     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2496
2497     /* This might cause changes in the read/write indexex, hence let's
2498      * request a timing update */
2499     request_auto_timing_update(s, TRUE);
2500
2501     return o;
2502 }
2503
2504 uint32_t pa_stream_get_device_index(pa_stream *s) {
2505     pa_assert(s);
2506     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2507
2508     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2509     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2510     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2511     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2512     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2513
2514     return s->device_index;
2515 }
2516
2517 const char *pa_stream_get_device_name(pa_stream *s) {
2518     pa_assert(s);
2519     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2520
2521     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2522     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2523     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2524     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2525     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2526
2527     return s->device_name;
2528 }
2529
2530 int pa_stream_is_suspended(pa_stream *s) {
2531     pa_assert(s);
2532     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2533
2534     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2535     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2536     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2537     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2538
2539     return s->suspended;
2540 }
2541
2542 int pa_stream_is_corked(pa_stream *s) {
2543     pa_assert(s);
2544     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2545
2546     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2547     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2548     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2549
2550     return s->corked;
2551 }
2552
2553 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2554     pa_operation *o = userdata;
2555     int success = 1;
2556
2557     pa_assert(pd);
2558     pa_assert(o);
2559     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2560
2561     if (!o->context)
2562         goto finish;
2563
2564     if (command != PA_COMMAND_REPLY) {
2565         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2566             goto finish;
2567
2568         success = 0;
2569     } else {
2570
2571         if (!pa_tagstruct_eof(t)) {
2572             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2573             goto finish;
2574         }
2575     }
2576
2577     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2578     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2579
2580     if (o->callback) {
2581         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2582         cb(o->stream, success, o->userdata);
2583     }
2584
2585 finish:
2586     pa_operation_done(o);
2587     pa_operation_unref(o);
2588 }
2589
2590
2591 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2592     pa_operation *o;
2593     pa_tagstruct *t;
2594     uint32_t tag;
2595
2596     pa_assert(s);
2597     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2598
2599     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2600     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2601     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2602     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2603     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2604     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2605
2606     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2607     o->private = PA_UINT_TO_PTR(rate);
2608
2609     t = pa_tagstruct_command(
2610             s->context,
2611             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2612             &tag);
2613     pa_tagstruct_putu32(t, s->channel);
2614     pa_tagstruct_putu32(t, rate);
2615
2616     pa_pstream_send_tagstruct(s->context->pstream, t);
2617     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2618
2619     return o;
2620 }
2621
2622 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2623     pa_operation *o;
2624     pa_tagstruct *t;
2625     uint32_t tag;
2626
2627     pa_assert(s);
2628     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2629
2630     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2631     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2632     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2633     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2634     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2635
2636     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2637
2638     t = pa_tagstruct_command(
2639             s->context,
2640             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2641             &tag);
2642     pa_tagstruct_putu32(t, s->channel);
2643     pa_tagstruct_putu32(t, (uint32_t) mode);
2644     pa_tagstruct_put_proplist(t, p);
2645
2646     pa_pstream_send_tagstruct(s->context->pstream, t);
2647     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2648
2649     /* Please note that we don't update s->proplist here, because we
2650      * don't export that field */
2651
2652     return o;
2653 }
2654
2655 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2656     pa_operation *o;
2657     pa_tagstruct *t;
2658     uint32_t tag;
2659     const char * const*k;
2660
2661     pa_assert(s);
2662     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2663
2664     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2665     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2666     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2667     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2668     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2669
2670     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2671
2672     t = pa_tagstruct_command(
2673             s->context,
2674             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2675             &tag);
2676     pa_tagstruct_putu32(t, s->channel);
2677
2678     for (k = keys; *k; k++)
2679         pa_tagstruct_puts(t, *k);
2680
2681     pa_tagstruct_puts(t, NULL);
2682
2683     pa_pstream_send_tagstruct(s->context->pstream, t);
2684     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2685
2686     /* Please note that we don't update s->proplist here, because we
2687      * don't export that field */
2688
2689     return o;
2690 }
2691
2692 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2693     pa_assert(s);
2694     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2695
2696     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2697     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2698     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2699     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2700
2701     s->direct_on_input = sink_input_idx;
2702
2703     return 0;
2704 }
2705
2706 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2707     pa_assert(s);
2708     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2709
2710     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2711     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2712     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2713
2714     return s->direct_on_input;
2715 }