merge glitch-free branch back into trunk
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2004-2006 Lennart Poettering
7   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9   PulseAudio is free software; you can redistribute it and/or modify
10   it under the terms of the GNU Lesser General Public License as published
11   by the Free Software Foundation; either version 2 of the License,
12   or (at your option) any later version.
13
14   PulseAudio is distributed in the hope that it will be useful, but
15   WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   General Public License for more details.
18
19   You should have received a copy of the GNU Lesser General Public License
20   along with PulseAudio; if not, write to the Free Software
21   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22   USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <string.h>
30 #include <stdio.h>
31 #include <string.h>
32
33 #include <pulse/def.h>
34 #include <pulse/timeval.h>
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/rtclock.h>
42
43 #include "internal.h"
44
45 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
46
47 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_MIN_HISTORY (4)
50
51 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
52     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
53 }
54
55 static void reset_callbacks(pa_stream *s) {
56     s->read_callback = NULL;
57     s->read_userdata = NULL;
58     s->write_callback = NULL;
59     s->write_userdata = NULL;
60     s->state_callback = NULL;
61     s->state_userdata = NULL;
62     s->overflow_callback = NULL;
63     s->overflow_userdata = NULL;
64     s->underflow_callback = NULL;
65     s->underflow_userdata = NULL;
66     s->latency_update_callback = NULL;
67     s->latency_update_userdata = NULL;
68     s->moved_callback = NULL;
69     s->moved_userdata = NULL;
70     s->suspended_callback = NULL;
71     s->suspended_userdata = NULL;
72     s->started_callback = NULL;
73     s->started_userdata = NULL;
74 }
75
76 pa_stream *pa_stream_new_with_proplist(
77         pa_context *c,
78         const char *name,
79         const pa_sample_spec *ss,
80         const pa_channel_map *map,
81         pa_proplist *p) {
82
83     pa_stream *s;
84     int i;
85     pa_channel_map tmap;
86
87     pa_assert(c);
88     pa_assert(PA_REFCNT_VALUE(c) >= 1);
89
90     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
91     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE || ss->format != PA_SAMPLE_S32NE), PA_ERR_NOTSUPPORTED);
92     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
93     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
94
95     if (!map)
96         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
97
98     s = pa_xnew(pa_stream, 1);
99     PA_REFCNT_INIT(s);
100     s->context = c;
101     s->mainloop = c->mainloop;
102
103     s->direction = PA_STREAM_NODIRECTION;
104     s->state = PA_STREAM_UNCONNECTED;
105     s->flags = 0;
106
107     s->sample_spec = *ss;
108     s->channel_map = *map;
109
110     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
111     if (name)
112         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
113
114     s->channel = 0;
115     s->channel_valid = FALSE;
116     s->syncid = c->csyncid++;
117     s->stream_index = PA_INVALID_INDEX;
118
119     s->requested_bytes = 0;
120     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
121
122     s->device_index = PA_INVALID_INDEX;
123     s->device_name = NULL;
124     s->suspended = FALSE;
125
126     pa_memchunk_reset(&s->peek_memchunk);
127     s->peek_data = NULL;
128
129     s->record_memblockq = NULL;
130
131     s->corked = FALSE;
132
133     memset(&s->timing_info, 0, sizeof(s->timing_info));
134     s->timing_info_valid = FALSE;
135
136     s->previous_time = 0;
137
138     s->read_index_not_before = 0;
139     s->write_index_not_before = 0;
140     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
141         s->write_index_corrections[i].valid = 0;
142     s->current_write_index_correction = 0;
143
144     s->auto_timing_update_event = NULL;
145     s->auto_timing_update_requested = FALSE;
146
147     reset_callbacks(s);
148
149     s->smoother = NULL;
150
151     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
152     PA_LLIST_PREPEND(pa_stream, c->streams, s);
153     pa_stream_ref(s);
154
155     return s;
156 }
157
158 static void stream_unlink(pa_stream *s) {
159     pa_operation *o, *n;
160     pa_assert(s);
161
162     if (!s->context)
163         return;
164
165     /* Detach from context */
166
167     /* Unref all operatio object that point to us */
168     for (o = s->context->operations; o; o = n) {
169         n = o->next;
170
171         if (o->stream == s)
172             pa_operation_cancel(o);
173     }
174
175     /* Drop all outstanding replies for this stream */
176     if (s->context->pdispatch)
177         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
178
179     if (s->channel_valid) {
180         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
181         s->channel = 0;
182         s->channel_valid = FALSE;
183     }
184
185     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
186     pa_stream_unref(s);
187
188     s->context = NULL;
189
190     if (s->auto_timing_update_event) {
191         pa_assert(s->mainloop);
192         s->mainloop->time_free(s->auto_timing_update_event);
193     }
194
195     reset_callbacks(s);
196 }
197
198 static void stream_free(pa_stream *s) {
199     pa_assert(s);
200
201     stream_unlink(s);
202
203     if (s->peek_memchunk.memblock) {
204         if (s->peek_data)
205             pa_memblock_release(s->peek_memchunk.memblock);
206         pa_memblock_unref(s->peek_memchunk.memblock);
207     }
208
209     if (s->record_memblockq)
210         pa_memblockq_free(s->record_memblockq);
211
212     if (s->proplist)
213         pa_proplist_free(s->proplist);
214
215     if (s->smoother)
216         pa_smoother_free(s->smoother);
217
218     pa_xfree(s->device_name);
219     pa_xfree(s);
220 }
221
222 void pa_stream_unref(pa_stream *s) {
223     pa_assert(s);
224     pa_assert(PA_REFCNT_VALUE(s) >= 1);
225
226     if (PA_REFCNT_DEC(s) <= 0)
227         stream_free(s);
228 }
229
230 pa_stream* pa_stream_ref(pa_stream *s) {
231     pa_assert(s);
232     pa_assert(PA_REFCNT_VALUE(s) >= 1);
233
234     PA_REFCNT_INC(s);
235     return s;
236 }
237
238 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
239     pa_assert(s);
240     pa_assert(PA_REFCNT_VALUE(s) >= 1);
241
242     return s->state;
243 }
244
245 pa_context* pa_stream_get_context(pa_stream *s) {
246     pa_assert(s);
247     pa_assert(PA_REFCNT_VALUE(s) >= 1);
248
249     return s->context;
250 }
251
252 uint32_t pa_stream_get_index(pa_stream *s) {
253     pa_assert(s);
254     pa_assert(PA_REFCNT_VALUE(s) >= 1);
255
256     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
257
258     return s->stream_index;
259 }
260
261 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
262     pa_assert(s);
263     pa_assert(PA_REFCNT_VALUE(s) >= 1);
264
265     if (s->state == st)
266         return;
267
268     pa_stream_ref(s);
269
270     s->state = st;
271
272     if (s->state_callback)
273         s->state_callback(s, s->state_userdata);
274
275     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
276         stream_unlink(s);
277
278     pa_stream_unref(s);
279 }
280
281 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
282     pa_assert(s);
283     pa_assert(PA_REFCNT_VALUE(s) >= 1);
284
285     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
286         return;
287
288     if (s->state == PA_STREAM_READY &&
289         (force || !s->auto_timing_update_requested)) {
290         pa_operation *o;
291
292 /*         pa_log("automatically requesting new timing data"); */
293
294         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
295             pa_operation_unref(o);
296             s->auto_timing_update_requested = TRUE;
297         }
298     }
299
300     if (s->auto_timing_update_event) {
301         struct timeval next;
302         pa_gettimeofday(&next);
303         pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
304         s->mainloop->time_restart(s->auto_timing_update_event, &next);
305     }
306 }
307
308 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
309     pa_context *c = userdata;
310     pa_stream *s;
311     uint32_t channel;
312
313     pa_assert(pd);
314     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
315     pa_assert(t);
316     pa_assert(c);
317     pa_assert(PA_REFCNT_VALUE(c) >= 1);
318
319     pa_context_ref(c);
320
321     if (pa_tagstruct_getu32(t, &channel) < 0 ||
322         !pa_tagstruct_eof(t)) {
323         pa_context_fail(c, PA_ERR_PROTOCOL);
324         goto finish;
325     }
326
327     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
328         goto finish;
329
330     if (s->state != PA_STREAM_READY)
331         goto finish;
332
333     pa_context_set_error(c, PA_ERR_KILLED);
334     pa_stream_set_state(s, PA_STREAM_FAILED);
335
336 finish:
337     pa_context_unref(c);
338 }
339
340 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
341     pa_context *c = userdata;
342     pa_stream *s;
343     uint32_t channel;
344     const char *dn;
345     pa_bool_t suspended;
346     uint32_t di;
347     pa_usec_t usec;
348     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
349
350     pa_assert(pd);
351     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
352     pa_assert(t);
353     pa_assert(c);
354     pa_assert(PA_REFCNT_VALUE(c) >= 1);
355
356     pa_context_ref(c);
357
358     if (c->version < 12) {
359         pa_context_fail(c, PA_ERR_PROTOCOL);
360         goto finish;
361     }
362
363     if (pa_tagstruct_getu32(t, &channel) < 0 ||
364         pa_tagstruct_getu32(t, &di) < 0 ||
365         pa_tagstruct_gets(t, &dn) < 0 ||
366         pa_tagstruct_get_boolean(t, &suspended) < 0) {
367         pa_context_fail(c, PA_ERR_PROTOCOL);
368         goto finish;
369     }
370
371     if (c->version >= 13) {
372
373         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
374             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
375                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
376                 pa_tagstruct_get_usec(t, &usec) < 0) {
377                 pa_context_fail(c, PA_ERR_PROTOCOL);
378                 goto finish;
379             }
380         } else {
381             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
382                 pa_tagstruct_getu32(t, &tlength) < 0 ||
383                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
384                 pa_tagstruct_getu32(t, &minreq) < 0 ||
385                 pa_tagstruct_get_usec(t, &usec) < 0) {
386                 pa_context_fail(c, PA_ERR_PROTOCOL);
387                 goto finish;
388             }
389         }
390     }
391
392     if (!pa_tagstruct_eof(t)) {
393         pa_context_fail(c, PA_ERR_PROTOCOL);
394         goto finish;
395     }
396
397     if (!dn || di == PA_INVALID_INDEX) {
398         pa_context_fail(c, PA_ERR_PROTOCOL);
399         goto finish;
400     }
401
402     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
403         goto finish;
404
405     if (s->state != PA_STREAM_READY)
406         goto finish;
407
408     if (c->version >= 13) {
409         if (s->direction == PA_STREAM_RECORD)
410             s->timing_info.configured_source_usec = usec;
411         else
412             s->timing_info.configured_sink_usec = usec;
413
414         s->buffer_attr.maxlength = maxlength;
415         s->buffer_attr.fragsize = fragsize;
416         s->buffer_attr.tlength = tlength;
417         s->buffer_attr.prebuf = prebuf;
418         s->buffer_attr.minreq = minreq;
419     }
420
421     pa_xfree(s->device_name);
422     s->device_name = pa_xstrdup(dn);
423     s->device_index = di;
424
425     s->suspended = suspended;
426
427     request_auto_timing_update(s, TRUE);
428
429     if (s->moved_callback)
430         s->moved_callback(s, s->moved_userdata);
431
432 finish:
433     pa_context_unref(c);
434 }
435
436 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
437     pa_context *c = userdata;
438     pa_stream *s;
439     uint32_t channel;
440     pa_bool_t suspended;
441
442     pa_assert(pd);
443     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
444     pa_assert(t);
445     pa_assert(c);
446     pa_assert(PA_REFCNT_VALUE(c) >= 1);
447
448     pa_context_ref(c);
449
450     if (c->version < 12) {
451         pa_context_fail(c, PA_ERR_PROTOCOL);
452         goto finish;
453     }
454
455     if (pa_tagstruct_getu32(t, &channel) < 0 ||
456         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
457         !pa_tagstruct_eof(t)) {
458         pa_context_fail(c, PA_ERR_PROTOCOL);
459         goto finish;
460     }
461
462     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
463         goto finish;
464
465     if (s->state != PA_STREAM_READY)
466         goto finish;
467
468     s->suspended = suspended;
469
470     if (s->smoother) {
471         pa_usec_t x = pa_rtclock_usec();
472
473         if (s->timing_info_valid)
474             x -= s->timing_info.transport_usec;
475
476         if (s->suspended || s->corked)
477             pa_smoother_pause(s->smoother, x);
478     }
479
480     request_auto_timing_update(s, TRUE);
481
482     if (s->suspended_callback)
483         s->suspended_callback(s, s->suspended_userdata);
484
485 finish:
486     pa_context_unref(c);
487 }
488
489 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490     pa_context *c = userdata;
491     pa_stream *s;
492     uint32_t channel;
493
494     pa_assert(pd);
495     pa_assert(command == PA_COMMAND_STARTED);
496     pa_assert(t);
497     pa_assert(c);
498     pa_assert(PA_REFCNT_VALUE(c) >= 1);
499
500     pa_context_ref(c);
501
502     if (c->version < 13) {
503         pa_context_fail(c, PA_ERR_PROTOCOL);
504         goto finish;
505     }
506
507     if (pa_tagstruct_getu32(t, &channel) < 0 ||
508         !pa_tagstruct_eof(t)) {
509         pa_context_fail(c, PA_ERR_PROTOCOL);
510         goto finish;
511     }
512
513     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
514         goto finish;
515
516     if (s->state != PA_STREAM_READY)
517         goto finish;
518
519     request_auto_timing_update(s, TRUE);
520
521     if (s->started_callback)
522         s->started_callback(s, s->suspended_userdata);
523
524 finish:
525     pa_context_unref(c);
526 }
527
528 void pa_command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
529     pa_stream *s;
530     pa_context *c = userdata;
531     uint32_t bytes, channel;
532
533     pa_assert(pd);
534     pa_assert(command == PA_COMMAND_REQUEST);
535     pa_assert(t);
536     pa_assert(c);
537     pa_assert(PA_REFCNT_VALUE(c) >= 1);
538
539     pa_context_ref(c);
540
541     if (pa_tagstruct_getu32(t, &channel) < 0 ||
542         pa_tagstruct_getu32(t, &bytes) < 0 ||
543         !pa_tagstruct_eof(t)) {
544         pa_context_fail(c, PA_ERR_PROTOCOL);
545         goto finish;
546     }
547
548     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
549         goto finish;
550
551     if (s->state != PA_STREAM_READY)
552         goto finish;
553
554     s->requested_bytes += bytes;
555
556     if (s->requested_bytes > 0 && s->write_callback)
557         s->write_callback(s, s->requested_bytes, s->write_userdata);
558
559 finish:
560     pa_context_unref(c);
561 }
562
563 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
564     pa_stream *s;
565     pa_context *c = userdata;
566     uint32_t channel;
567
568     pa_assert(pd);
569     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
570     pa_assert(t);
571     pa_assert(c);
572     pa_assert(PA_REFCNT_VALUE(c) >= 1);
573
574     pa_context_ref(c);
575
576     if (pa_tagstruct_getu32(t, &channel) < 0 ||
577         !pa_tagstruct_eof(t)) {
578         pa_context_fail(c, PA_ERR_PROTOCOL);
579         goto finish;
580     }
581
582     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
583         goto finish;
584
585     if (s->state != PA_STREAM_READY)
586         goto finish;
587
588     if (s->smoother)
589         if (s->direction == PA_STREAM_PLAYBACK && s->buffer_attr.prebuf > 0) {
590             pa_usec_t x = pa_rtclock_usec();
591
592             if (s->timing_info_valid)
593                 x -= s->timing_info.transport_usec;
594
595             pa_smoother_pause(s->smoother, x);
596         }
597
598     request_auto_timing_update(s, TRUE);
599
600     if (s->state == PA_STREAM_READY) {
601
602         if (command == PA_COMMAND_OVERFLOW) {
603             if (s->overflow_callback)
604                 s->overflow_callback(s, s->overflow_userdata);
605         } else if (command == PA_COMMAND_UNDERFLOW) {
606             if (s->underflow_callback)
607                 s->underflow_callback(s, s->underflow_userdata);
608         }
609     }
610
611  finish:
612     pa_context_unref(c);
613 }
614
615 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
616     pa_assert(s);
617     pa_assert(PA_REFCNT_VALUE(s) >= 1);
618
619 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
620
621     if (s->state != PA_STREAM_READY)
622         return;
623
624     if (w) {
625         s->write_index_not_before = s->context->ctag;
626
627         if (s->timing_info_valid)
628             s->timing_info.write_index_corrupt = 1;
629
630 /*         pa_log("write_index invalidated"); */
631     }
632
633     if (r) {
634         s->read_index_not_before = s->context->ctag;
635
636         if (s->timing_info_valid)
637             s->timing_info.read_index_corrupt = 1;
638
639 /*         pa_log("read_index invalidated"); */
640     }
641
642     request_auto_timing_update(s, TRUE);
643 }
644
645 static void auto_timing_update_callback(PA_GCC_UNUSED pa_mainloop_api *m, PA_GCC_UNUSED pa_time_event *e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) {
646     pa_stream *s = userdata;
647
648     pa_assert(s);
649     pa_assert(PA_REFCNT_VALUE(s) >= 1);
650
651     pa_stream_ref(s);
652     request_auto_timing_update(s, FALSE);
653     pa_stream_unref(s);
654 }
655
656 static void create_stream_complete(pa_stream *s) {
657     pa_assert(s);
658     pa_assert(PA_REFCNT_VALUE(s) >= 1);
659     pa_assert(s->state == PA_STREAM_CREATING);
660
661     pa_stream_set_state(s, PA_STREAM_READY);
662
663     if (s->requested_bytes > 0 && s->write_callback)
664         s->write_callback(s, s->requested_bytes, s->write_userdata);
665
666     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
667         struct timeval tv;
668         pa_gettimeofday(&tv);
669         tv.tv_usec += LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
670         pa_assert(!s->auto_timing_update_event);
671         s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
672
673         request_auto_timing_update(s, TRUE);
674     }
675 }
676
677 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
678     pa_assert(s);
679     pa_assert(attr);
680     pa_assert(ss);
681
682     if (s->context->version >= 13)
683         return;
684
685     /* Version older than 0.9.10 didn't do server side buffer_attr
686      * selection, hence we have to fake it on the client side */
687
688     if (!attr->maxlength <= 0)
689         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
690
691     if (!attr->tlength <= 0)
692         attr->tlength = pa_bytes_per_second(ss)*2; /* 2s of buffering */
693
694     if (!attr->minreq <= 0)
695         attr->minreq = (2*attr->tlength)/10; /* Ask for more data when there are only 200ms left in the playback buffer */
696
697     if (!attr->prebuf)
698         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
699
700     if (!attr->fragsize)
701         attr->fragsize  = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
702 }
703
704 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
705     pa_stream *s = userdata;
706
707     pa_assert(pd);
708     pa_assert(s);
709     pa_assert(PA_REFCNT_VALUE(s) >= 1);
710     pa_assert(s->state == PA_STREAM_CREATING);
711
712     pa_stream_ref(s);
713
714     if (command != PA_COMMAND_REPLY) {
715         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
716             goto finish;
717
718         pa_stream_set_state(s, PA_STREAM_FAILED);
719         goto finish;
720     }
721
722     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
723         s->channel == PA_INVALID_INDEX ||
724         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
725         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
726         pa_context_fail(s->context, PA_ERR_PROTOCOL);
727         goto finish;
728     }
729
730     if (s->context->version >= 9) {
731         if (s->direction == PA_STREAM_PLAYBACK) {
732             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
733                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
734                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
735                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
736                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
737                 goto finish;
738             }
739         } else if (s->direction == PA_STREAM_RECORD) {
740             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
741                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
742                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
743                 goto finish;
744             }
745         }
746     }
747
748     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
749         pa_sample_spec ss;
750         pa_channel_map cm;
751         const char *dn = NULL;
752         pa_bool_t suspended;
753
754         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
755             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
756             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
757             pa_tagstruct_gets(t, &dn) < 0 ||
758             pa_tagstruct_get_boolean(t, &suspended) < 0) {
759             pa_context_fail(s->context, PA_ERR_PROTOCOL);
760             goto finish;
761         }
762
763         if (!dn || s->device_index == PA_INVALID_INDEX ||
764             ss.channels != cm.channels ||
765             !pa_channel_map_valid(&cm) ||
766             !pa_sample_spec_valid(&ss) ||
767             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
768             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
769             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
770             pa_context_fail(s->context, PA_ERR_PROTOCOL);
771             goto finish;
772         }
773
774         pa_xfree(s->device_name);
775         s->device_name = pa_xstrdup(dn);
776         s->suspended = suspended;
777
778         s->channel_map = cm;
779         s->sample_spec = ss;
780     }
781
782     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
783         pa_usec_t usec;
784
785         if (pa_tagstruct_get_usec(t, &usec) < 0) {
786             pa_context_fail(s->context, PA_ERR_PROTOCOL);
787             goto finish;
788         }
789
790         if (s->direction == PA_STREAM_RECORD)
791             s->timing_info.configured_source_usec = usec;
792         else
793             s->timing_info.configured_sink_usec = usec;
794     }
795
796     if (!pa_tagstruct_eof(t)) {
797         pa_context_fail(s->context, PA_ERR_PROTOCOL);
798         goto finish;
799     }
800
801     if (s->direction == PA_STREAM_RECORD) {
802         pa_assert(!s->record_memblockq);
803
804         s->record_memblockq = pa_memblockq_new(
805                 0,
806                 s->buffer_attr.maxlength,
807                 0,
808                 pa_frame_size(&s->sample_spec),
809                 1,
810                 0,
811                 0,
812                 NULL);
813     }
814
815     s->channel_valid = TRUE;
816     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
817
818     create_stream_complete(s);
819
820 finish:
821     pa_stream_unref(s);
822 }
823
824 static int create_stream(
825         pa_stream_direction_t direction,
826         pa_stream *s,
827         const char *dev,
828         const pa_buffer_attr *attr,
829         pa_stream_flags_t flags,
830         const pa_cvolume *volume,
831         pa_stream *sync_stream) {
832
833     pa_tagstruct *t;
834     uint32_t tag;
835
836     pa_assert(s);
837     pa_assert(PA_REFCNT_VALUE(s) >= 1);
838     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
839
840     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
841     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
842                                               PA_STREAM_INTERPOLATE_TIMING|
843                                               PA_STREAM_NOT_MONOTONOUS|
844                                               PA_STREAM_AUTO_TIMING_UPDATE|
845                                               PA_STREAM_NO_REMAP_CHANNELS|
846                                               PA_STREAM_NO_REMIX_CHANNELS|
847                                               PA_STREAM_FIX_FORMAT|
848                                               PA_STREAM_FIX_RATE|
849                                               PA_STREAM_FIX_CHANNELS|
850                                               PA_STREAM_DONT_MOVE|
851                                               PA_STREAM_VARIABLE_RATE|
852                                               PA_STREAM_PEAK_DETECT|
853                                               PA_STREAM_START_MUTED|
854                                               PA_STREAM_ADJUST_LATENCY)), PA_ERR_INVALID);
855
856     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
857     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
858     /* Althought some of the other flags are not supported on older
859      * version, we don't check for them here, because it doesn't hurt
860      * when they are passed but actually not supported. This makes
861      * client development easier */
862
863     PA_CHECK_VALIDITY(s->context, direction != PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
864     PA_CHECK_VALIDITY(s->context, direction != PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
865     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
866     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
867
868     pa_stream_ref(s);
869
870     s->direction = direction;
871     s->flags = flags;
872
873     if (sync_stream)
874         s->syncid = sync_stream->syncid;
875
876     if (attr)
877         s->buffer_attr = *attr;
878     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
879
880     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
881         pa_usec_t x;
882
883         if (s->smoother)
884             pa_smoother_free(s->smoother);
885
886         s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONOUS), SMOOTHER_MIN_HISTORY);
887
888         x = pa_rtclock_usec();
889         pa_smoother_set_time_offset(s->smoother, x);
890         pa_smoother_pause(s->smoother, x);
891     }
892
893     if (!dev)
894         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
895
896     t = pa_tagstruct_command(
897             s->context,
898             s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
899             &tag);
900
901     if (s->context->version < 13)
902         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
903
904     pa_tagstruct_put(
905             t,
906             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
907             PA_TAG_CHANNEL_MAP, &s->channel_map,
908             PA_TAG_U32, PA_INVALID_INDEX,
909             PA_TAG_STRING, dev,
910             PA_TAG_U32, s->buffer_attr.maxlength,
911             PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
912             PA_TAG_INVALID);
913
914     if (s->direction == PA_STREAM_PLAYBACK) {
915         pa_cvolume cv;
916
917         pa_tagstruct_put(
918                 t,
919                 PA_TAG_U32, s->buffer_attr.tlength,
920                 PA_TAG_U32, s->buffer_attr.prebuf,
921                 PA_TAG_U32, s->buffer_attr.minreq,
922                 PA_TAG_U32, s->syncid,
923                 PA_TAG_INVALID);
924
925         if (!volume)
926             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
927
928         pa_tagstruct_put_cvolume(t, volume);
929     } else
930         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
931
932     if (s->context->version >= 12) {
933         pa_tagstruct_put(
934                 t,
935                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
936                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
937                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
938                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
939                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
940                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
941                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
942                 PA_TAG_INVALID);
943     }
944
945     if (s->context->version >= 13) {
946
947         if (s->direction == PA_STREAM_PLAYBACK)
948             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
949         else
950             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
951
952         pa_tagstruct_put(
953                 t,
954                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
955                 PA_TAG_PROPLIST, s->proplist,
956                 PA_TAG_INVALID);
957     }
958
959     pa_pstream_send_tagstruct(s->context->pstream, t);
960     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
961
962     pa_stream_set_state(s, PA_STREAM_CREATING);
963
964     pa_stream_unref(s);
965     return 0;
966 }
967
968 int pa_stream_connect_playback(
969         pa_stream *s,
970         const char *dev,
971         const pa_buffer_attr *attr,
972         pa_stream_flags_t flags,
973         pa_cvolume *volume,
974         pa_stream *sync_stream) {
975
976     pa_assert(s);
977     pa_assert(PA_REFCNT_VALUE(s) >= 1);
978
979     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
980 }
981
982 int pa_stream_connect_record(
983         pa_stream *s,
984         const char *dev,
985         const pa_buffer_attr *attr,
986         pa_stream_flags_t flags) {
987
988     pa_assert(s);
989     pa_assert(PA_REFCNT_VALUE(s) >= 1);
990
991     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
992 }
993
994 int pa_stream_write(
995         pa_stream *s,
996         const void *data,
997         size_t length,
998         void (*free_cb)(void *p),
999         int64_t offset,
1000         pa_seek_mode_t seek) {
1001
1002     pa_memchunk chunk;
1003     pa_seek_mode_t t_seek;
1004     int64_t t_offset;
1005     size_t t_length;
1006     const void *t_data;
1007
1008     pa_assert(s);
1009     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1010     pa_assert(data);
1011
1012     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1013     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1014     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1015     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1016
1017     if (length <= 0)
1018         return 0;
1019
1020     t_seek = seek;
1021     t_offset = offset;
1022     t_length = length;
1023     t_data = data;
1024
1025     while (t_length > 0) {
1026
1027         chunk.index = 0;
1028
1029         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1030             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1031             chunk.length = t_length;
1032         } else {
1033             void *d;
1034
1035             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1036             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1037
1038             d = pa_memblock_acquire(chunk.memblock);
1039             memcpy(d, t_data, chunk.length);
1040             pa_memblock_release(chunk.memblock);
1041         }
1042
1043         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1044
1045         t_offset = 0;
1046         t_seek = PA_SEEK_RELATIVE;
1047
1048         t_data = (const uint8_t*) t_data + chunk.length;
1049         t_length -= chunk.length;
1050
1051         pa_memblock_unref(chunk.memblock);
1052     }
1053
1054     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1055         free_cb((void*) data);
1056
1057     if (length < s->requested_bytes)
1058         s->requested_bytes -= length;
1059     else
1060         s->requested_bytes = 0;
1061
1062     if (s->direction == PA_STREAM_PLAYBACK) {
1063
1064         /* Update latency request correction */
1065         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1066
1067             if (seek == PA_SEEK_ABSOLUTE) {
1068                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1069                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1070                 s->write_index_corrections[s->current_write_index_correction].value = offset + length;
1071             } else if (seek == PA_SEEK_RELATIVE) {
1072                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1073                     s->write_index_corrections[s->current_write_index_correction].value += offset + length;
1074             } else
1075                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1076         }
1077
1078         /* Update the write index in the already available latency data */
1079         if (s->timing_info_valid) {
1080
1081             if (seek == PA_SEEK_ABSOLUTE) {
1082                 s->timing_info.write_index_corrupt = FALSE;
1083                 s->timing_info.write_index = offset + length;
1084             } else if (seek == PA_SEEK_RELATIVE) {
1085                 if (!s->timing_info.write_index_corrupt)
1086                     s->timing_info.write_index += offset + length;
1087             } else
1088                 s->timing_info.write_index_corrupt = TRUE;
1089         }
1090
1091         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1092             request_auto_timing_update(s, TRUE);
1093     }
1094
1095     return 0;
1096 }
1097
1098 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1099     pa_assert(s);
1100     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1101     pa_assert(data);
1102     pa_assert(length);
1103
1104     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1105     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1106
1107     if (!s->peek_memchunk.memblock) {
1108
1109         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1110             *data = NULL;
1111             *length = 0;
1112             return 0;
1113         }
1114
1115         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1116     }
1117
1118     pa_assert(s->peek_data);
1119     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1120     *length = s->peek_memchunk.length;
1121     return 0;
1122 }
1123
1124 int pa_stream_drop(pa_stream *s) {
1125     pa_assert(s);
1126     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1127
1128     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1129     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1130     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1131
1132     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1133
1134     /* Fix the simulated local read index */
1135     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1136         s->timing_info.read_index += s->peek_memchunk.length;
1137
1138     pa_assert(s->peek_data);
1139     pa_memblock_release(s->peek_memchunk.memblock);
1140     pa_memblock_unref(s->peek_memchunk.memblock);
1141     pa_memchunk_reset(&s->peek_memchunk);
1142
1143     return 0;
1144 }
1145
1146 size_t pa_stream_writable_size(pa_stream *s) {
1147     pa_assert(s);
1148     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1149
1150     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1151     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1152
1153     return s->requested_bytes;
1154 }
1155
1156 size_t pa_stream_readable_size(pa_stream *s) {
1157     pa_assert(s);
1158     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1159
1160     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1161     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1162
1163     return pa_memblockq_get_length(s->record_memblockq);
1164 }
1165
1166 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1167     pa_operation *o;
1168     pa_tagstruct *t;
1169     uint32_t tag;
1170
1171     pa_assert(s);
1172     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1173
1174     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1175     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1176
1177     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1178
1179     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1180     pa_tagstruct_putu32(t, s->channel);
1181     pa_pstream_send_tagstruct(s->context->pstream, t);
1182     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1183
1184     return o;
1185 }
1186
1187 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1188     pa_usec_t usec;
1189
1190     pa_assert(s);
1191     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1192     pa_assert(s->state == PA_STREAM_READY);
1193     pa_assert(s->direction != PA_STREAM_UPLOAD);
1194     pa_assert(s->timing_info_valid);
1195     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1196     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1197
1198     if (s->direction == PA_STREAM_PLAYBACK) {
1199         /* The last byte that was written into the output device
1200          * had this time value associated */
1201         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1202
1203         if (!s->corked && !s->suspended) {
1204
1205             if (!ignore_transport)
1206                 /* Because the latency info took a little time to come
1207                  * to us, we assume that the real output time is actually
1208                  * a little ahead */
1209                 usec += s->timing_info.transport_usec;
1210
1211             /* However, the output device usually maintains a buffer
1212                too, hence the real sample currently played is a little
1213                back  */
1214             if (s->timing_info.sink_usec >= usec)
1215                 usec = 0;
1216             else
1217                 usec -= s->timing_info.sink_usec;
1218         }
1219
1220     } else if (s->direction == PA_STREAM_RECORD) {
1221         /* The last byte written into the server side queue had
1222          * this time value associated */
1223         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1224
1225         if (!s->corked && !s->suspended) {
1226
1227             if (!ignore_transport)
1228                 /* Add transport latency */
1229                 usec += s->timing_info.transport_usec;
1230
1231             /* Add latency of data in device buffer */
1232             usec += s->timing_info.source_usec;
1233
1234             /* If this is a monitor source, we need to correct the
1235              * time by the playback device buffer */
1236             if (s->timing_info.sink_usec >= usec)
1237                 usec = 0;
1238             else
1239                 usec -= s->timing_info.sink_usec;
1240         }
1241     }
1242
1243     return usec;
1244 }
1245
1246 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1247     pa_operation *o = userdata;
1248     struct timeval local, remote, now;
1249     pa_timing_info *i;
1250     pa_bool_t playing = FALSE;
1251     uint64_t underrun_for = 0, playing_for = 0;
1252
1253     pa_assert(pd);
1254     pa_assert(o);
1255     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1256
1257     if (!o->context || !o->stream)
1258         goto finish;
1259
1260     i = &o->stream->timing_info;
1261
1262 /*     pa_log("pre corrupt w:%u r:%u\n", !o->stream->timing_info_valid || i->write_index_corrupt,!o->stream->timing_info_valid || i->read_index_corrupt); */
1263
1264     o->stream->timing_info_valid = FALSE;
1265     i->write_index_corrupt = FALSE;
1266     i->read_index_corrupt = FALSE;
1267
1268 /*     pa_log("timing update %u\n", tag); */
1269
1270     if (command != PA_COMMAND_REPLY) {
1271         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1272             goto finish;
1273
1274     } else {
1275
1276         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1277             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1278             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1279             pa_tagstruct_get_timeval(t, &local) < 0 ||
1280             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1281             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1282             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1283
1284             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1285             goto finish;
1286         }
1287
1288         if (o->context->version >= 13 &&
1289             o->stream->direction == PA_STREAM_PLAYBACK)
1290             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1291                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1292
1293                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1294                 goto finish;
1295             }
1296
1297
1298         if (!pa_tagstruct_eof(t)) {
1299             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1300             goto finish;
1301         }
1302
1303         o->stream->timing_info_valid = TRUE;
1304         i->playing = (int) playing;
1305         i->since_underrun = playing ? playing_for : underrun_for;
1306
1307         pa_gettimeofday(&now);
1308
1309         /* Calculcate timestamps */
1310         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1311             /* local and remote seem to have synchronized clocks */
1312
1313             if (o->stream->direction == PA_STREAM_PLAYBACK)
1314                 i->transport_usec = pa_timeval_diff(&remote, &local);
1315             else
1316                 i->transport_usec = pa_timeval_diff(&now, &remote);
1317
1318             i->synchronized_clocks = TRUE;
1319             i->timestamp = remote;
1320         } else {
1321             /* clocks are not synchronized, let's estimate latency then */
1322             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1323             i->synchronized_clocks = FALSE;
1324             i->timestamp = local;
1325             pa_timeval_add(&i->timestamp, i->transport_usec);
1326         }
1327
1328         /* Invalidate read and write indexes if necessary */
1329         if (tag < o->stream->read_index_not_before)
1330             i->read_index_corrupt = TRUE;
1331
1332         if (tag < o->stream->write_index_not_before)
1333             i->write_index_corrupt = TRUE;
1334
1335         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1336             /* Write index correction */
1337
1338             int n, j;
1339             uint32_t ctag = tag;
1340
1341             /* Go through the saved correction values and add up the total correction.*/
1342
1343             for (n = 0, j = o->stream->current_write_index_correction+1;
1344                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1345                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1346
1347                 /* Step over invalid data or out-of-date data */
1348                 if (!o->stream->write_index_corrections[j].valid ||
1349                     o->stream->write_index_corrections[j].tag < ctag)
1350                     continue;
1351
1352                 /* Make sure that everything is in order */
1353                 ctag = o->stream->write_index_corrections[j].tag+1;
1354
1355                 /* Now fix the write index */
1356                 if (o->stream->write_index_corrections[j].corrupt) {
1357                     /* A corrupting seek was made */
1358                     i->write_index = 0;
1359                     i->write_index_corrupt = TRUE;
1360                 } else if (o->stream->write_index_corrections[j].absolute) {
1361                     /* An absolute seek was made */
1362                     i->write_index = o->stream->write_index_corrections[j].value;
1363                     i->write_index_corrupt = FALSE;
1364                 } else if (!i->write_index_corrupt) {
1365                     /* A relative seek was made */
1366                     i->write_index += o->stream->write_index_corrections[j].value;
1367                 }
1368             }
1369         }
1370
1371         if (o->stream->direction == PA_STREAM_RECORD) {
1372             /* Read index correction */
1373
1374             if (!i->read_index_corrupt)
1375                 i->read_index -= pa_memblockq_get_length(o->stream->record_memblockq);
1376         }
1377
1378 /*     pa_log("post corrupt w:%u r:%u\n", i->write_index_corrupt || !o->stream->timing_info_valid, i->read_index_corrupt || !o->stream->timing_info_valid); */
1379
1380         /* Clear old correction entries */
1381         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1382             int n;
1383
1384             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1385                 if (!o->stream->write_index_corrections[n].valid)
1386                     continue;
1387
1388                 if (o->stream->write_index_corrections[n].tag <= tag)
1389                     o->stream->write_index_corrections[n].valid = FALSE;
1390             }
1391         }
1392
1393         /* Update smoother */
1394         if (o->stream->smoother) {
1395             pa_usec_t u, x;
1396
1397             u = x = pa_rtclock_usec() - i->transport_usec;
1398
1399             if (o->stream->direction == PA_STREAM_PLAYBACK &&
1400                 o->context->version >= 13) {
1401                 pa_usec_t su;
1402
1403                 /* If we weren't playing then it will take some time
1404                  * until the audio will actually come out through the
1405                  * speakers. Since we follow that timing here, we need
1406                  * to try to fix this up */
1407
1408                 su = pa_bytes_to_usec(i->since_underrun, &o->stream->sample_spec);
1409
1410                 if (su < i->sink_usec)
1411                     x += i->sink_usec - su;
1412             }
1413
1414             if (!i->playing)
1415                 pa_smoother_pause(o->stream->smoother, x);
1416
1417             /* Update the smoother */
1418             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1419                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1420                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1421
1422             if (i->playing)
1423                 pa_smoother_resume(o->stream->smoother, x);
1424         }
1425     }
1426
1427     o->stream->auto_timing_update_requested = FALSE;
1428
1429     if (o->stream->latency_update_callback)
1430         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1431
1432     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1433         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1434         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1435     }
1436
1437 finish:
1438
1439     pa_operation_done(o);
1440     pa_operation_unref(o);
1441 }
1442
1443 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1444     uint32_t tag;
1445     pa_operation *o;
1446     pa_tagstruct *t;
1447     struct timeval now;
1448     int cidx = 0;
1449
1450     pa_assert(s);
1451     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1452
1453     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1454     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1455
1456     if (s->direction == PA_STREAM_PLAYBACK) {
1457         /* Find a place to store the write_index correction data for this entry */
1458         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1459
1460         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1461         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1462     }
1463     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1464
1465     t = pa_tagstruct_command(
1466             s->context,
1467             s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY,
1468             &tag);
1469     pa_tagstruct_putu32(t, s->channel);
1470     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1471
1472     pa_pstream_send_tagstruct(s->context->pstream, t);
1473     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1474
1475     if (s->direction == PA_STREAM_PLAYBACK) {
1476         /* Fill in initial correction data */
1477
1478         s->current_write_index_correction = cidx;
1479
1480         s->write_index_corrections[cidx].valid = TRUE;
1481         s->write_index_corrections[cidx].absolute = FALSE;
1482         s->write_index_corrections[cidx].corrupt = FALSE;
1483         s->write_index_corrections[cidx].tag = tag;
1484         s->write_index_corrections[cidx].value = 0;
1485     }
1486
1487     return o;
1488 }
1489
1490 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
1491     pa_stream *s = userdata;
1492
1493     pa_assert(pd);
1494     pa_assert(s);
1495     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1496
1497     pa_stream_ref(s);
1498
1499     if (command != PA_COMMAND_REPLY) {
1500         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1501             goto finish;
1502
1503         pa_stream_set_state(s, PA_STREAM_FAILED);
1504         goto finish;
1505     } else if (!pa_tagstruct_eof(t)) {
1506         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1507         goto finish;
1508     }
1509
1510     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1511
1512 finish:
1513     pa_stream_unref(s);
1514 }
1515
1516 int pa_stream_disconnect(pa_stream *s) {
1517     pa_tagstruct *t;
1518     uint32_t tag;
1519
1520     pa_assert(s);
1521     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1522
1523     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1524     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1525
1526     pa_stream_ref(s);
1527
1528     t = pa_tagstruct_command(
1529             s->context,
1530             s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1531             (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM),
1532             &tag);
1533     pa_tagstruct_putu32(t, s->channel);
1534     pa_pstream_send_tagstruct(s->context->pstream, t);
1535     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1536
1537     pa_stream_unref(s);
1538     return 0;
1539 }
1540
1541 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1542     pa_assert(s);
1543     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1544
1545     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1546         return;
1547
1548     s->read_callback = cb;
1549     s->read_userdata = userdata;
1550 }
1551
1552 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1553     pa_assert(s);
1554     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1555
1556     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1557         return;
1558
1559     s->write_callback = cb;
1560     s->write_userdata = userdata;
1561 }
1562
1563 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1564     pa_assert(s);
1565     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1566
1567     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1568         return;
1569
1570     s->state_callback = cb;
1571     s->state_userdata = userdata;
1572 }
1573
1574 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1575     pa_assert(s);
1576     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1577
1578     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1579         return;
1580
1581     s->overflow_callback = cb;
1582     s->overflow_userdata = userdata;
1583 }
1584
1585 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1586     pa_assert(s);
1587     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1588
1589     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1590         return;
1591
1592     s->underflow_callback = cb;
1593     s->underflow_userdata = userdata;
1594 }
1595
1596 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1597     pa_assert(s);
1598     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1599
1600     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1601         return;
1602
1603     s->latency_update_callback = cb;
1604     s->latency_update_userdata = userdata;
1605 }
1606
1607 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1608     pa_assert(s);
1609     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1610
1611     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1612         return;
1613
1614     s->moved_callback = cb;
1615     s->moved_userdata = userdata;
1616 }
1617
1618 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1619     pa_assert(s);
1620     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1621
1622     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1623         return;
1624
1625     s->suspended_callback = cb;
1626     s->suspended_userdata = userdata;
1627 }
1628
1629 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1630     pa_assert(s);
1631     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1632
1633     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1634         return;
1635
1636     s->started_callback = cb;
1637     s->started_userdata = userdata;
1638 }
1639
1640 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
1641     pa_operation *o = userdata;
1642     int success = 1;
1643
1644     pa_assert(pd);
1645     pa_assert(o);
1646     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1647
1648     if (!o->context)
1649         goto finish;
1650
1651     if (command != PA_COMMAND_REPLY) {
1652         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1653             goto finish;
1654
1655         success = 0;
1656     } else if (!pa_tagstruct_eof(t)) {
1657         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1658         goto finish;
1659     }
1660
1661     if (o->callback) {
1662         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1663         cb(o->stream, success, o->userdata);
1664     }
1665
1666 finish:
1667     pa_operation_done(o);
1668     pa_operation_unref(o);
1669 }
1670
1671 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1672     pa_operation *o;
1673     pa_tagstruct *t;
1674     uint32_t tag;
1675
1676     pa_assert(s);
1677     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1678
1679     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1680     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1681
1682     s->corked = b;
1683
1684     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1685
1686     t = pa_tagstruct_command(
1687             s->context,
1688             s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM,
1689             &tag);
1690     pa_tagstruct_putu32(t, s->channel);
1691     pa_tagstruct_put_boolean(t, !!b);
1692     pa_pstream_send_tagstruct(s->context->pstream, t);
1693     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1694
1695     if (s->smoother) {
1696         pa_usec_t x = pa_rtclock_usec();
1697
1698         if (s->timing_info_valid)
1699             x += s->timing_info.transport_usec;
1700
1701         if (s->suspended || s->corked)
1702             pa_smoother_pause(s->smoother, x);
1703     }
1704
1705     if (s->direction == PA_STREAM_PLAYBACK)
1706         invalidate_indexes(s, TRUE, FALSE);
1707
1708     return o;
1709 }
1710
1711 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1712     pa_tagstruct *t;
1713     pa_operation *o;
1714     uint32_t tag;
1715
1716     pa_assert(s);
1717     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1718
1719     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1720
1721     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1722
1723     t = pa_tagstruct_command(s->context, command, &tag);
1724     pa_tagstruct_putu32(t, s->channel);
1725     pa_pstream_send_tagstruct(s->context->pstream, t);
1726     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1727
1728     return o;
1729 }
1730
1731 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1732     pa_operation *o;
1733
1734     pa_assert(s);
1735     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1736
1737     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1738     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1739
1740     if ((o = stream_send_simple_command(s, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM, cb, userdata))) {
1741
1742         if (s->direction == PA_STREAM_PLAYBACK) {
1743             if (s->write_index_corrections[s->current_write_index_correction].valid)
1744                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1745
1746             if (s->timing_info_valid)
1747                 s->timing_info.write_index_corrupt = TRUE;
1748
1749             if (s->buffer_attr.prebuf > 0)
1750                 invalidate_indexes(s, TRUE, FALSE);
1751             else
1752                 request_auto_timing_update(s, TRUE);
1753
1754             if (s->smoother && s->buffer_attr.prebuf > 0) {
1755                 pa_usec_t x = pa_rtclock_usec();
1756
1757                 if (s->timing_info_valid)
1758                     x += s->timing_info.transport_usec;
1759
1760                 pa_smoother_pause(s->smoother, x);
1761             }
1762
1763         } else
1764             invalidate_indexes(s, FALSE, TRUE);
1765     }
1766
1767     return o;
1768 }
1769
1770 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1771     pa_operation *o;
1772
1773     pa_assert(s);
1774     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1775
1776     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1777     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1778     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1779
1780     if ((o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1781         invalidate_indexes(s, TRUE, FALSE);
1782
1783     return o;
1784 }
1785
1786 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1787     pa_operation *o;
1788
1789     pa_assert(s);
1790     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1791
1792     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1793     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1794     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1795
1796     if ((o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1797         invalidate_indexes(s, TRUE, FALSE);
1798
1799     return o;
1800 }
1801
1802 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1803     pa_operation *o;
1804
1805     pa_assert(s);
1806     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1807     pa_assert(name);
1808
1809     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1810     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1811
1812     if (s->context->version >= 13) {
1813         pa_proplist *p = pa_proplist_new();
1814
1815         pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name);
1816         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
1817         pa_proplist_free(p);
1818     } else {
1819         pa_tagstruct *t;
1820         uint32_t tag;
1821
1822         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1823         t = pa_tagstruct_command(
1824                 s->context,
1825                 s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME,
1826                 &tag);
1827         pa_tagstruct_putu32(t, s->channel);
1828         pa_tagstruct_puts(t, name);
1829         pa_pstream_send_tagstruct(s->context->pstream, t);
1830         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1831     }
1832
1833     return o;
1834 }
1835
1836 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
1837     pa_usec_t usec;
1838
1839     pa_assert(s);
1840     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1841
1842     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1843     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1844     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
1845     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
1846     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
1847
1848     if (s->smoother)
1849         usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
1850     else
1851         usec = calc_time(s, FALSE);
1852
1853     /* Make sure the time runs monotonically */
1854     if (!(s->flags & PA_STREAM_NOT_MONOTONOUS)) {
1855         if (usec < s->previous_time)
1856             usec = s->previous_time;
1857         else
1858             s->previous_time = usec;
1859     }
1860
1861     if (r_usec)
1862         *r_usec = usec;
1863
1864     return 0;
1865 }
1866
1867 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
1868     pa_assert(s);
1869     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1870
1871     if (negative)
1872         *negative = 0;
1873
1874     if (a >= b)
1875         return a-b;
1876     else {
1877         if (negative && s->direction == PA_STREAM_RECORD) {
1878             *negative = 1;
1879             return b-a;
1880         } else
1881             return 0;
1882     }
1883 }
1884
1885 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
1886     pa_usec_t t, c;
1887     int r;
1888     int64_t cindex;
1889
1890     pa_assert(s);
1891     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1892     pa_assert(r_usec);
1893
1894     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1895     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1896     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
1897     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
1898     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
1899
1900     if ((r = pa_stream_get_time(s, &t)) < 0)
1901         return r;
1902
1903     if (s->direction == PA_STREAM_PLAYBACK)
1904         cindex = s->timing_info.write_index;
1905     else
1906         cindex = s->timing_info.read_index;
1907
1908     if (cindex < 0)
1909         cindex = 0;
1910
1911     c = pa_bytes_to_usec(cindex, &s->sample_spec);
1912
1913     if (s->direction == PA_STREAM_PLAYBACK)
1914         *r_usec = time_counter_diff(s, c, t, negative);
1915     else
1916         *r_usec = time_counter_diff(s, t, c, negative);
1917
1918     return 0;
1919 }
1920
1921 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
1922     pa_assert(s);
1923     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1924
1925     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1926     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1927     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_BADSTATE);
1928
1929     return &s->timing_info;
1930 }
1931
1932 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
1933     pa_assert(s);
1934     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1935
1936     return &s->sample_spec;
1937 }
1938
1939 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
1940     pa_assert(s);
1941     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1942
1943     return &s->channel_map;
1944 }
1945
1946 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
1947     pa_assert(s);
1948     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1949
1950     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1951     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1952     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
1953
1954     return &s->buffer_attr;
1955 }
1956
1957 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
1958     pa_operation *o = userdata;
1959     int success = 1;
1960
1961     pa_assert(pd);
1962     pa_assert(o);
1963     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1964
1965     if (!o->context)
1966         goto finish;
1967
1968     if (command != PA_COMMAND_REPLY) {
1969         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1970             goto finish;
1971
1972         success = 0;
1973     } else {
1974
1975         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1976             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
1977                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
1978                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
1979                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
1980                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1981                 goto finish;
1982             }
1983         } else if (o->stream->direction == PA_STREAM_RECORD) {
1984             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
1985                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
1986                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1987                 goto finish;
1988             }
1989         }
1990
1991         if (!pa_tagstruct_eof(t)) {
1992             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1993             goto finish;
1994         }
1995     }
1996
1997     if (o->callback) {
1998         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1999         cb(o->stream, success, o->userdata);
2000     }
2001
2002 finish:
2003     pa_operation_done(o);
2004     pa_operation_unref(o);
2005 }
2006
2007
2008 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2009     pa_operation *o;
2010     pa_tagstruct *t;
2011     uint32_t tag;
2012
2013     pa_assert(s);
2014     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2015     pa_assert(attr);
2016
2017     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2018     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2019     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2020
2021     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2022
2023     t = pa_tagstruct_command(
2024             s->context,
2025             s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR,
2026             &tag);
2027     pa_tagstruct_putu32(t, s->channel);
2028
2029     pa_tagstruct_putu32(t, attr->maxlength);
2030
2031     if (s->direction == PA_STREAM_PLAYBACK)
2032         pa_tagstruct_put(
2033                 t,
2034                 PA_TAG_U32, attr->tlength,
2035                 PA_TAG_U32, attr->prebuf,
2036                 PA_TAG_U32, attr->minreq,
2037                 PA_TAG_INVALID);
2038     else
2039         pa_tagstruct_putu32(t, attr->fragsize);
2040
2041     if (s->context->version >= 13)
2042         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2043
2044     pa_pstream_send_tagstruct(s->context->pstream, t);
2045     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2046
2047     return o;
2048 }
2049
2050 uint32_t pa_stream_get_device_index(pa_stream *s) {
2051     pa_assert(s);
2052     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2053
2054     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2055     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2056     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2057     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2058
2059     return s->device_index;
2060 }
2061
2062 const char *pa_stream_get_device_name(pa_stream *s) {
2063     pa_assert(s);
2064     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2065
2066     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2067     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2068     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2069     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2070
2071     return s->device_name;
2072 }
2073
2074 int pa_stream_is_suspended(pa_stream *s) {
2075     pa_assert(s);
2076     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2077
2078     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2079     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2080     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2081
2082     return s->suspended;
2083 }
2084
2085 int pa_stream_is_corked(pa_stream *s) {
2086     pa_assert(s);
2087     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2088
2089     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2090     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2091
2092     return s->corked;
2093 }
2094
2095 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
2096     pa_operation *o = userdata;
2097     int success = 1;
2098
2099     pa_assert(pd);
2100     pa_assert(o);
2101     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2102
2103     if (!o->context)
2104         goto finish;
2105
2106     if (command != PA_COMMAND_REPLY) {
2107         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2108             goto finish;
2109
2110         success = 0;
2111     } else {
2112
2113         if (!pa_tagstruct_eof(t)) {
2114             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2115             goto finish;
2116         }
2117     }
2118
2119     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2120     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2121
2122     if (o->callback) {
2123         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2124         cb(o->stream, success, o->userdata);
2125     }
2126
2127 finish:
2128     pa_operation_done(o);
2129     pa_operation_unref(o);
2130 }
2131
2132
2133 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2134     pa_operation *o;
2135     pa_tagstruct *t;
2136     uint32_t tag;
2137
2138     pa_assert(s);
2139     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140
2141     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2142     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2143     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2144     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2145     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2146
2147     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2148     o->private = PA_UINT_TO_PTR(rate);
2149
2150     t = pa_tagstruct_command(
2151             s->context,
2152             s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE,
2153             &tag);
2154     pa_tagstruct_putu32(t, s->channel);
2155     pa_tagstruct_putu32(t, rate);
2156
2157     pa_pstream_send_tagstruct(s->context->pstream, t);
2158     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2159
2160     return o;
2161 }
2162
2163 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2164     pa_operation *o;
2165     pa_tagstruct *t;
2166     uint32_t tag;
2167
2168     pa_assert(s);
2169     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2170
2171     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2172     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2173     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2174     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2175
2176     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2177
2178     t = pa_tagstruct_command(
2179             s->context,
2180             s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST,
2181             &tag);
2182     pa_tagstruct_putu32(t, s->channel);
2183     pa_tagstruct_putu32(t, (uint32_t) mode);
2184     pa_tagstruct_put_proplist(t, p);
2185
2186     pa_pstream_send_tagstruct(s->context->pstream, t);
2187     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2188
2189     /* Please note that we don't update s->proplist here, because we
2190      * don't export that field */
2191
2192     return o;
2193 }
2194
2195 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2196     pa_operation *o;
2197     pa_tagstruct *t;
2198     uint32_t tag;
2199     const char * const*k;
2200
2201     pa_assert(s);
2202     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2203
2204     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2205     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2206     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2207     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2208
2209     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2210
2211     t = pa_tagstruct_command(
2212             s->context,
2213             s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST,
2214             &tag);
2215     pa_tagstruct_putu32(t, s->channel);
2216
2217     for (k = keys; *k; k++)
2218         pa_tagstruct_puts(t, *k);
2219
2220     pa_tagstruct_puts(t, NULL);
2221
2222     pa_pstream_send_tagstruct(s->context->pstream, t);
2223     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2224
2225     /* Please note that we don't update s->proplist here, because we
2226      * don't export that field */
2227
2228     return o;
2229 }