libpulse: introduce PA_STREAM_RELATIVE_VOLUME
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57     s->read_callback = NULL;
58     s->read_userdata = NULL;
59     s->write_callback = NULL;
60     s->write_userdata = NULL;
61     s->state_callback = NULL;
62     s->state_userdata = NULL;
63     s->overflow_callback = NULL;
64     s->overflow_userdata = NULL;
65     s->underflow_callback = NULL;
66     s->underflow_userdata = NULL;
67     s->latency_update_callback = NULL;
68     s->latency_update_userdata = NULL;
69     s->moved_callback = NULL;
70     s->moved_userdata = NULL;
71     s->suspended_callback = NULL;
72     s->suspended_userdata = NULL;
73     s->started_callback = NULL;
74     s->started_userdata = NULL;
75     s->event_callback = NULL;
76     s->event_userdata = NULL;
77     s->buffer_attr_callback = NULL;
78     s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82         pa_context *c,
83         const char *name,
84         const pa_sample_spec *ss,
85         const pa_channel_map *map,
86         pa_proplist *p) {
87
88     pa_stream *s;
89     int i;
90     pa_channel_map tmap;
91
92     pa_assert(c);
93     pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     if (!map)
104         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114
115     s->sample_spec = *ss;
116     s->channel_map = *map;
117
118     s->direct_on_input = PA_INVALID_INDEX;
119
120     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121     if (name)
122         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124     s->channel = 0;
125     s->channel_valid = FALSE;
126     s->syncid = c->csyncid++;
127     s->stream_index = PA_INVALID_INDEX;
128
129     s->requested_bytes = 0;
130     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132     /* We initialize der target length here, so that if the user
133      * passes no explicit buffering metrics the default is similar to
134      * what older PA versions provided. */
135
136     s->buffer_attr.maxlength = (uint32_t) -1;
137     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138     s->buffer_attr.minreq = (uint32_t) -1;
139     s->buffer_attr.prebuf = (uint32_t) -1;
140     s->buffer_attr.fragsize = (uint32_t) -1;
141
142     s->device_index = PA_INVALID_INDEX;
143     s->device_name = NULL;
144     s->suspended = FALSE;
145     s->corked = FALSE;
146
147     s->write_memblock = NULL;
148     s->write_data = NULL;
149
150     pa_memchunk_reset(&s->peek_memchunk);
151     s->peek_data = NULL;
152     s->record_memblockq = NULL;
153
154     memset(&s->timing_info, 0, sizeof(s->timing_info));
155     s->timing_info_valid = FALSE;
156
157     s->previous_time = 0;
158
159     s->read_index_not_before = 0;
160     s->write_index_not_before = 0;
161     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162         s->write_index_corrections[i].valid = 0;
163     s->current_write_index_correction = 0;
164
165     s->auto_timing_update_event = NULL;
166     s->auto_timing_update_requested = FALSE;
167     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
168
169     reset_callbacks(s);
170
171     s->smoother = NULL;
172
173     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174     PA_LLIST_PREPEND(pa_stream, c->streams, s);
175     pa_stream_ref(s);
176
177     return s;
178 }
179
180 static void stream_unlink(pa_stream *s) {
181     pa_operation *o, *n;
182     pa_assert(s);
183
184     if (!s->context)
185         return;
186
187     /* Detach from context */
188
189     /* Unref all operatio object that point to us */
190     for (o = s->context->operations; o; o = n) {
191         n = o->next;
192
193         if (o->stream == s)
194             pa_operation_cancel(o);
195     }
196
197     /* Drop all outstanding replies for this stream */
198     if (s->context->pdispatch)
199         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200
201     if (s->channel_valid) {
202         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203         s->channel = 0;
204         s->channel_valid = FALSE;
205     }
206
207     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208     pa_stream_unref(s);
209
210     s->context = NULL;
211
212     if (s->auto_timing_update_event) {
213         pa_assert(s->mainloop);
214         s->mainloop->time_free(s->auto_timing_update_event);
215     }
216
217     reset_callbacks(s);
218 }
219
220 static void stream_free(pa_stream *s) {
221     pa_assert(s);
222
223     stream_unlink(s);
224
225     if (s->write_memblock) {
226         pa_memblock_release(s->write_memblock);
227         pa_memblock_unref(s->write_data);
228     }
229
230     if (s->peek_memchunk.memblock) {
231         if (s->peek_data)
232             pa_memblock_release(s->peek_memchunk.memblock);
233         pa_memblock_unref(s->peek_memchunk.memblock);
234     }
235
236     if (s->record_memblockq)
237         pa_memblockq_free(s->record_memblockq);
238
239     if (s->proplist)
240         pa_proplist_free(s->proplist);
241
242     if (s->smoother)
243         pa_smoother_free(s->smoother);
244
245     pa_xfree(s->device_name);
246     pa_xfree(s);
247 }
248
249 void pa_stream_unref(pa_stream *s) {
250     pa_assert(s);
251     pa_assert(PA_REFCNT_VALUE(s) >= 1);
252
253     if (PA_REFCNT_DEC(s) <= 0)
254         stream_free(s);
255 }
256
257 pa_stream* pa_stream_ref(pa_stream *s) {
258     pa_assert(s);
259     pa_assert(PA_REFCNT_VALUE(s) >= 1);
260
261     PA_REFCNT_INC(s);
262     return s;
263 }
264
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->state;
270 }
271
272 pa_context* pa_stream_get_context(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     return s->context;
277 }
278
279 uint32_t pa_stream_get_index(pa_stream *s) {
280     pa_assert(s);
281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
282
283     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
285
286     return s->stream_index;
287 }
288
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290     pa_assert(s);
291     pa_assert(PA_REFCNT_VALUE(s) >= 1);
292
293     if (s->state == st)
294         return;
295
296     pa_stream_ref(s);
297
298     s->state = st;
299
300     if (s->state_callback)
301         s->state_callback(s, s->state_userdata);
302
303     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304         stream_unlink(s);
305
306     pa_stream_unref(s);
307 }
308
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310     pa_assert(s);
311     pa_assert(PA_REFCNT_VALUE(s) >= 1);
312
313     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314         return;
315
316     if (s->state == PA_STREAM_READY &&
317         (force || !s->auto_timing_update_requested)) {
318         pa_operation *o;
319
320 /*         pa_log("Automatically requesting new timing data"); */
321
322         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323             pa_operation_unref(o);
324             s->auto_timing_update_requested = TRUE;
325         }
326     }
327
328     if (s->auto_timing_update_event) {
329         if (force)
330             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
331
332         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
333
334         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335     }
336 }
337
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339     pa_context *c = userdata;
340     pa_stream *s;
341     uint32_t channel;
342
343     pa_assert(pd);
344     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345     pa_assert(t);
346     pa_assert(c);
347     pa_assert(PA_REFCNT_VALUE(c) >= 1);
348
349     pa_context_ref(c);
350
351     if (pa_tagstruct_getu32(t, &channel) < 0 ||
352         !pa_tagstruct_eof(t)) {
353         pa_context_fail(c, PA_ERR_PROTOCOL);
354         goto finish;
355     }
356
357     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358         goto finish;
359
360     if (s->state != PA_STREAM_READY)
361         goto finish;
362
363     pa_context_set_error(c, PA_ERR_KILLED);
364     pa_stream_set_state(s, PA_STREAM_FAILED);
365
366 finish:
367     pa_context_unref(c);
368 }
369
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371     pa_usec_t x;
372
373     pa_assert(s);
374     pa_assert(!force_start || !force_stop);
375
376     if (!s->smoother)
377         return;
378
379     x = pa_rtclock_now();
380
381     if (s->timing_info_valid) {
382         if (aposteriori)
383             x -= s->timing_info.transport_usec;
384         else
385             x += s->timing_info.transport_usec;
386     }
387
388     if (s->suspended || s->corked || force_stop)
389         pa_smoother_pause(s->smoother, x);
390     else if (force_start || s->buffer_attr.prebuf == 0)
391         pa_smoother_resume(s->smoother, x, TRUE);
392
393
394     /* Please note that we have no idea if playback actually started
395      * if prebuf is non-zero! */
396 }
397
398 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
399     pa_context *c = userdata;
400     pa_stream *s;
401     uint32_t channel;
402     const char *dn;
403     pa_bool_t suspended;
404     uint32_t di;
405     pa_usec_t usec = 0;
406     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
407
408     pa_assert(pd);
409     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
410     pa_assert(t);
411     pa_assert(c);
412     pa_assert(PA_REFCNT_VALUE(c) >= 1);
413
414     pa_context_ref(c);
415
416     if (c->version < 12) {
417         pa_context_fail(c, PA_ERR_PROTOCOL);
418         goto finish;
419     }
420
421     if (pa_tagstruct_getu32(t, &channel) < 0 ||
422         pa_tagstruct_getu32(t, &di) < 0 ||
423         pa_tagstruct_gets(t, &dn) < 0 ||
424         pa_tagstruct_get_boolean(t, &suspended) < 0) {
425         pa_context_fail(c, PA_ERR_PROTOCOL);
426         goto finish;
427     }
428
429     if (c->version >= 13) {
430
431         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
432             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
433                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
434                 pa_tagstruct_get_usec(t, &usec) < 0) {
435                 pa_context_fail(c, PA_ERR_PROTOCOL);
436                 goto finish;
437             }
438         } else {
439             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
440                 pa_tagstruct_getu32(t, &tlength) < 0 ||
441                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
442                 pa_tagstruct_getu32(t, &minreq) < 0 ||
443                 pa_tagstruct_get_usec(t, &usec) < 0) {
444                 pa_context_fail(c, PA_ERR_PROTOCOL);
445                 goto finish;
446             }
447         }
448     }
449
450     if (!pa_tagstruct_eof(t)) {
451         pa_context_fail(c, PA_ERR_PROTOCOL);
452         goto finish;
453     }
454
455     if (!dn || di == PA_INVALID_INDEX) {
456         pa_context_fail(c, PA_ERR_PROTOCOL);
457         goto finish;
458     }
459
460     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
461         goto finish;
462
463     if (s->state != PA_STREAM_READY)
464         goto finish;
465
466     if (c->version >= 13) {
467         if (s->direction == PA_STREAM_RECORD)
468             s->timing_info.configured_source_usec = usec;
469         else
470             s->timing_info.configured_sink_usec = usec;
471
472         s->buffer_attr.maxlength = maxlength;
473         s->buffer_attr.fragsize = fragsize;
474         s->buffer_attr.tlength = tlength;
475         s->buffer_attr.prebuf = prebuf;
476         s->buffer_attr.minreq = minreq;
477     }
478
479     pa_xfree(s->device_name);
480     s->device_name = pa_xstrdup(dn);
481     s->device_index = di;
482
483     s->suspended = suspended;
484
485     check_smoother_status(s, TRUE, FALSE, FALSE);
486     request_auto_timing_update(s, TRUE);
487
488     if (s->moved_callback)
489         s->moved_callback(s, s->moved_userdata);
490
491 finish:
492     pa_context_unref(c);
493 }
494
495 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
496     pa_context *c = userdata;
497     pa_stream *s;
498     uint32_t channel;
499     pa_usec_t usec = 0;
500     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
501
502     pa_assert(pd);
503     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
504     pa_assert(t);
505     pa_assert(c);
506     pa_assert(PA_REFCNT_VALUE(c) >= 1);
507
508     pa_context_ref(c);
509
510     if (c->version < 15) {
511         pa_context_fail(c, PA_ERR_PROTOCOL);
512         goto finish;
513     }
514
515     if (pa_tagstruct_getu32(t, &channel) < 0) {
516         pa_context_fail(c, PA_ERR_PROTOCOL);
517         goto finish;
518     }
519
520     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
521         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
522             pa_tagstruct_getu32(t, &fragsize) < 0 ||
523             pa_tagstruct_get_usec(t, &usec) < 0) {
524             pa_context_fail(c, PA_ERR_PROTOCOL);
525             goto finish;
526         }
527     } else {
528         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
529             pa_tagstruct_getu32(t, &tlength) < 0 ||
530             pa_tagstruct_getu32(t, &prebuf) < 0 ||
531             pa_tagstruct_getu32(t, &minreq) < 0 ||
532             pa_tagstruct_get_usec(t, &usec) < 0) {
533             pa_context_fail(c, PA_ERR_PROTOCOL);
534             goto finish;
535         }
536     }
537
538     if (!pa_tagstruct_eof(t)) {
539         pa_context_fail(c, PA_ERR_PROTOCOL);
540         goto finish;
541     }
542
543     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
544         goto finish;
545
546     if (s->state != PA_STREAM_READY)
547         goto finish;
548
549     if (s->direction == PA_STREAM_RECORD)
550         s->timing_info.configured_source_usec = usec;
551     else
552         s->timing_info.configured_sink_usec = usec;
553
554     s->buffer_attr.maxlength = maxlength;
555     s->buffer_attr.fragsize = fragsize;
556     s->buffer_attr.tlength = tlength;
557     s->buffer_attr.prebuf = prebuf;
558     s->buffer_attr.minreq = minreq;
559
560     request_auto_timing_update(s, TRUE);
561
562     if (s->buffer_attr_callback)
563         s->buffer_attr_callback(s, s->buffer_attr_userdata);
564
565 finish:
566     pa_context_unref(c);
567 }
568
569 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
570     pa_context *c = userdata;
571     pa_stream *s;
572     uint32_t channel;
573     pa_bool_t suspended;
574
575     pa_assert(pd);
576     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
577     pa_assert(t);
578     pa_assert(c);
579     pa_assert(PA_REFCNT_VALUE(c) >= 1);
580
581     pa_context_ref(c);
582
583     if (c->version < 12) {
584         pa_context_fail(c, PA_ERR_PROTOCOL);
585         goto finish;
586     }
587
588     if (pa_tagstruct_getu32(t, &channel) < 0 ||
589         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
590         !pa_tagstruct_eof(t)) {
591         pa_context_fail(c, PA_ERR_PROTOCOL);
592         goto finish;
593     }
594
595     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
596         goto finish;
597
598     if (s->state != PA_STREAM_READY)
599         goto finish;
600
601     s->suspended = suspended;
602
603     check_smoother_status(s, TRUE, FALSE, FALSE);
604     request_auto_timing_update(s, TRUE);
605
606     if (s->suspended_callback)
607         s->suspended_callback(s, s->suspended_userdata);
608
609 finish:
610     pa_context_unref(c);
611 }
612
613 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
614     pa_context *c = userdata;
615     pa_stream *s;
616     uint32_t channel;
617
618     pa_assert(pd);
619     pa_assert(command == PA_COMMAND_STARTED);
620     pa_assert(t);
621     pa_assert(c);
622     pa_assert(PA_REFCNT_VALUE(c) >= 1);
623
624     pa_context_ref(c);
625
626     if (c->version < 13) {
627         pa_context_fail(c, PA_ERR_PROTOCOL);
628         goto finish;
629     }
630
631     if (pa_tagstruct_getu32(t, &channel) < 0 ||
632         !pa_tagstruct_eof(t)) {
633         pa_context_fail(c, PA_ERR_PROTOCOL);
634         goto finish;
635     }
636
637     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
638         goto finish;
639
640     if (s->state != PA_STREAM_READY)
641         goto finish;
642
643     check_smoother_status(s, TRUE, TRUE, FALSE);
644     request_auto_timing_update(s, TRUE);
645
646     if (s->started_callback)
647         s->started_callback(s, s->started_userdata);
648
649 finish:
650     pa_context_unref(c);
651 }
652
653 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
654     pa_context *c = userdata;
655     pa_stream *s;
656     uint32_t channel;
657     pa_proplist *pl = NULL;
658     const char *event;
659
660     pa_assert(pd);
661     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
662     pa_assert(t);
663     pa_assert(c);
664     pa_assert(PA_REFCNT_VALUE(c) >= 1);
665
666     pa_context_ref(c);
667
668     if (c->version < 15) {
669         pa_context_fail(c, PA_ERR_PROTOCOL);
670         goto finish;
671     }
672
673     pl = pa_proplist_new();
674
675     if (pa_tagstruct_getu32(t, &channel) < 0 ||
676         pa_tagstruct_gets(t, &event) < 0 ||
677         pa_tagstruct_get_proplist(t, pl) < 0 ||
678         !pa_tagstruct_eof(t) || !event) {
679         pa_context_fail(c, PA_ERR_PROTOCOL);
680         goto finish;
681     }
682
683     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
684         goto finish;
685
686     if (s->state != PA_STREAM_READY)
687         goto finish;
688
689     if (s->event_callback)
690         s->event_callback(s, event, pl, s->event_userdata);
691
692 finish:
693     pa_context_unref(c);
694
695     if (pl)
696         pa_proplist_free(pl);
697 }
698
699 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
700     pa_stream *s;
701     pa_context *c = userdata;
702     uint32_t bytes, channel;
703
704     pa_assert(pd);
705     pa_assert(command == PA_COMMAND_REQUEST);
706     pa_assert(t);
707     pa_assert(c);
708     pa_assert(PA_REFCNT_VALUE(c) >= 1);
709
710     pa_context_ref(c);
711
712     if (pa_tagstruct_getu32(t, &channel) < 0 ||
713         pa_tagstruct_getu32(t, &bytes) < 0 ||
714         !pa_tagstruct_eof(t)) {
715         pa_context_fail(c, PA_ERR_PROTOCOL);
716         goto finish;
717     }
718
719     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
720         goto finish;
721
722     if (s->state != PA_STREAM_READY)
723         goto finish;
724
725     s->requested_bytes += bytes;
726
727     if (s->requested_bytes > 0 && s->write_callback)
728         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
729
730 finish:
731     pa_context_unref(c);
732 }
733
734 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
735     pa_stream *s;
736     pa_context *c = userdata;
737     uint32_t channel;
738
739     pa_assert(pd);
740     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
741     pa_assert(t);
742     pa_assert(c);
743     pa_assert(PA_REFCNT_VALUE(c) >= 1);
744
745     pa_context_ref(c);
746
747     if (pa_tagstruct_getu32(t, &channel) < 0 ||
748         !pa_tagstruct_eof(t)) {
749         pa_context_fail(c, PA_ERR_PROTOCOL);
750         goto finish;
751     }
752
753     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
754         goto finish;
755
756     if (s->state != PA_STREAM_READY)
757         goto finish;
758
759     if (s->buffer_attr.prebuf > 0)
760         check_smoother_status(s, TRUE, FALSE, TRUE);
761
762     request_auto_timing_update(s, TRUE);
763
764     if (command == PA_COMMAND_OVERFLOW) {
765         if (s->overflow_callback)
766             s->overflow_callback(s, s->overflow_userdata);
767     } else if (command == PA_COMMAND_UNDERFLOW) {
768         if (s->underflow_callback)
769             s->underflow_callback(s, s->underflow_userdata);
770     }
771
772  finish:
773     pa_context_unref(c);
774 }
775
776 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
777     pa_assert(s);
778     pa_assert(PA_REFCNT_VALUE(s) >= 1);
779
780 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
781
782     if (s->state != PA_STREAM_READY)
783         return;
784
785     if (w) {
786         s->write_index_not_before = s->context->ctag;
787
788         if (s->timing_info_valid)
789             s->timing_info.write_index_corrupt = TRUE;
790
791 /*         pa_log("write_index invalidated"); */
792     }
793
794     if (r) {
795         s->read_index_not_before = s->context->ctag;
796
797         if (s->timing_info_valid)
798             s->timing_info.read_index_corrupt = TRUE;
799
800 /*         pa_log("read_index invalidated"); */
801     }
802
803     request_auto_timing_update(s, TRUE);
804 }
805
806 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
807     pa_stream *s = userdata;
808
809     pa_assert(s);
810     pa_assert(PA_REFCNT_VALUE(s) >= 1);
811
812     pa_stream_ref(s);
813     request_auto_timing_update(s, FALSE);
814     pa_stream_unref(s);
815 }
816
817 static void create_stream_complete(pa_stream *s) {
818     pa_assert(s);
819     pa_assert(PA_REFCNT_VALUE(s) >= 1);
820     pa_assert(s->state == PA_STREAM_CREATING);
821
822     pa_stream_set_state(s, PA_STREAM_READY);
823
824     if (s->requested_bytes > 0 && s->write_callback)
825         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
826
827     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
828         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
829         pa_assert(!s->auto_timing_update_event);
830         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
831
832         request_auto_timing_update(s, TRUE);
833     }
834
835     check_smoother_status(s, TRUE, FALSE, FALSE);
836 }
837
838 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
839     pa_assert(s);
840     pa_assert(attr);
841     pa_assert(ss);
842
843     if (s->context->version >= 13)
844         return;
845
846     /* Version older than 0.9.10 didn't do server side buffer_attr
847      * selection, hence we have to fake it on the client side. */
848
849     /* We choose fairly conservative values here, to not confuse
850      * old clients with extremely large playback buffers */
851
852     if (attr->maxlength == (uint32_t) -1)
853         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
854
855     if (attr->tlength == (uint32_t) -1)
856         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
857
858     if (attr->minreq == (uint32_t) -1)
859         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
860
861     if (attr->prebuf == (uint32_t) -1)
862         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
863
864     if (attr->fragsize == (uint32_t) -1)
865         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
866 }
867
868 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
869     pa_stream *s = userdata;
870     uint32_t requested_bytes = 0;
871
872     pa_assert(pd);
873     pa_assert(s);
874     pa_assert(PA_REFCNT_VALUE(s) >= 1);
875     pa_assert(s->state == PA_STREAM_CREATING);
876
877     pa_stream_ref(s);
878
879     if (command != PA_COMMAND_REPLY) {
880         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
881             goto finish;
882
883         pa_stream_set_state(s, PA_STREAM_FAILED);
884         goto finish;
885     }
886
887     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
888         s->channel == PA_INVALID_INDEX ||
889         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
890         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
891         pa_context_fail(s->context, PA_ERR_PROTOCOL);
892         goto finish;
893     }
894
895     s->requested_bytes = (int64_t) requested_bytes;
896
897     if (s->context->version >= 9) {
898         if (s->direction == PA_STREAM_PLAYBACK) {
899             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
900                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
901                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
902                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
903                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
904                 goto finish;
905             }
906         } else if (s->direction == PA_STREAM_RECORD) {
907             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
909                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
910                 goto finish;
911             }
912         }
913     }
914
915     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
916         pa_sample_spec ss;
917         pa_channel_map cm;
918         const char *dn = NULL;
919         pa_bool_t suspended;
920
921         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
922             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
923             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
924             pa_tagstruct_gets(t, &dn) < 0 ||
925             pa_tagstruct_get_boolean(t, &suspended) < 0) {
926             pa_context_fail(s->context, PA_ERR_PROTOCOL);
927             goto finish;
928         }
929
930         if (!dn || s->device_index == PA_INVALID_INDEX ||
931             ss.channels != cm.channels ||
932             !pa_channel_map_valid(&cm) ||
933             !pa_sample_spec_valid(&ss) ||
934             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
935             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
936             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
937             pa_context_fail(s->context, PA_ERR_PROTOCOL);
938             goto finish;
939         }
940
941         pa_xfree(s->device_name);
942         s->device_name = pa_xstrdup(dn);
943         s->suspended = suspended;
944
945         s->channel_map = cm;
946         s->sample_spec = ss;
947     }
948
949     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
950         pa_usec_t usec;
951
952         if (pa_tagstruct_get_usec(t, &usec) < 0) {
953             pa_context_fail(s->context, PA_ERR_PROTOCOL);
954             goto finish;
955         }
956
957         if (s->direction == PA_STREAM_RECORD)
958             s->timing_info.configured_source_usec = usec;
959         else
960             s->timing_info.configured_sink_usec = usec;
961     }
962
963     if (!pa_tagstruct_eof(t)) {
964         pa_context_fail(s->context, PA_ERR_PROTOCOL);
965         goto finish;
966     }
967
968     if (s->direction == PA_STREAM_RECORD) {
969         pa_assert(!s->record_memblockq);
970
971         s->record_memblockq = pa_memblockq_new(
972                 0,
973                 s->buffer_attr.maxlength,
974                 0,
975                 pa_frame_size(&s->sample_spec),
976                 1,
977                 0,
978                 0,
979                 NULL);
980     }
981
982     s->channel_valid = TRUE;
983     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
984
985     create_stream_complete(s);
986
987 finish:
988     pa_stream_unref(s);
989 }
990
991 static int create_stream(
992         pa_stream_direction_t direction,
993         pa_stream *s,
994         const char *dev,
995         const pa_buffer_attr *attr,
996         pa_stream_flags_t flags,
997         const pa_cvolume *volume,
998         pa_stream *sync_stream) {
999
1000     pa_tagstruct *t;
1001     uint32_t tag;
1002     pa_bool_t volume_set = FALSE;
1003
1004     pa_assert(s);
1005     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1007
1008     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1009     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1010     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1011     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1012                                               PA_STREAM_INTERPOLATE_TIMING|
1013                                               PA_STREAM_NOT_MONOTONIC|
1014                                               PA_STREAM_AUTO_TIMING_UPDATE|
1015                                               PA_STREAM_NO_REMAP_CHANNELS|
1016                                               PA_STREAM_NO_REMIX_CHANNELS|
1017                                               PA_STREAM_FIX_FORMAT|
1018                                               PA_STREAM_FIX_RATE|
1019                                               PA_STREAM_FIX_CHANNELS|
1020                                               PA_STREAM_DONT_MOVE|
1021                                               PA_STREAM_VARIABLE_RATE|
1022                                               PA_STREAM_PEAK_DETECT|
1023                                               PA_STREAM_START_MUTED|
1024                                               PA_STREAM_ADJUST_LATENCY|
1025                                               PA_STREAM_EARLY_REQUESTS|
1026                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1027                                               PA_STREAM_START_UNMUTED|
1028                                               PA_STREAM_FAIL_ON_SUSPEND|
1029                                               PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1030
1031     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1032     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1033     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1034     /* Althought some of the other flags are not supported on older
1035      * version, we don't check for them here, because it doesn't hurt
1036      * when they are passed but actually not supported. This makes
1037      * client development easier */
1038
1039     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1040     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1041     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1042     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1043     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1044
1045     pa_stream_ref(s);
1046
1047     s->direction = direction;
1048     s->flags = flags;
1049     s->corked = !!(flags & PA_STREAM_START_CORKED);
1050
1051     if (sync_stream)
1052         s->syncid = sync_stream->syncid;
1053
1054     if (attr)
1055         s->buffer_attr = *attr;
1056     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1057
1058     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1059         pa_usec_t x;
1060
1061         x = pa_rtclock_now();
1062
1063         pa_assert(!s->smoother);
1064         s->smoother = pa_smoother_new(
1065                 SMOOTHER_ADJUST_TIME,
1066                 SMOOTHER_HISTORY_TIME,
1067                 !(flags & PA_STREAM_NOT_MONOTONIC),
1068                 TRUE,
1069                 SMOOTHER_MIN_HISTORY,
1070                 x,
1071                 TRUE);
1072     }
1073
1074     if (!dev)
1075         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1076
1077     t = pa_tagstruct_command(
1078             s->context,
1079             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1080             &tag);
1081
1082     if (s->context->version < 13)
1083         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1084
1085     pa_tagstruct_put(
1086             t,
1087             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1088             PA_TAG_CHANNEL_MAP, &s->channel_map,
1089             PA_TAG_U32, PA_INVALID_INDEX,
1090             PA_TAG_STRING, dev,
1091             PA_TAG_U32, s->buffer_attr.maxlength,
1092             PA_TAG_BOOLEAN, s->corked,
1093             PA_TAG_INVALID);
1094
1095     if (s->direction == PA_STREAM_PLAYBACK) {
1096         pa_cvolume cv;
1097
1098         pa_tagstruct_put(
1099                 t,
1100                 PA_TAG_U32, s->buffer_attr.tlength,
1101                 PA_TAG_U32, s->buffer_attr.prebuf,
1102                 PA_TAG_U32, s->buffer_attr.minreq,
1103                 PA_TAG_U32, s->syncid,
1104                 PA_TAG_INVALID);
1105
1106         volume_set = !!volume;
1107
1108         if (!volume)
1109             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1110
1111         pa_tagstruct_put_cvolume(t, volume);
1112     } else
1113         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1114
1115     if (s->context->version >= 12) {
1116         pa_tagstruct_put(
1117                 t,
1118                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1119                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1120                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1121                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1122                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1123                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1124                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1125                 PA_TAG_INVALID);
1126     }
1127
1128     if (s->context->version >= 13) {
1129
1130         if (s->direction == PA_STREAM_PLAYBACK)
1131             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1132         else
1133             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1134
1135         pa_tagstruct_put(
1136                 t,
1137                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1138                 PA_TAG_PROPLIST, s->proplist,
1139                 PA_TAG_INVALID);
1140
1141         if (s->direction == PA_STREAM_RECORD)
1142             pa_tagstruct_putu32(t, s->direct_on_input);
1143     }
1144
1145     if (s->context->version >= 14) {
1146
1147         if (s->direction == PA_STREAM_PLAYBACK)
1148             pa_tagstruct_put_boolean(t, volume_set);
1149
1150         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1151     }
1152
1153     if (s->context->version >= 15) {
1154
1155         if (s->direction == PA_STREAM_PLAYBACK)
1156             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1157
1158         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1159         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1160     }
1161
1162     if (s->context->version >= 17) {
1163
1164         if (s->direction == PA_STREAM_PLAYBACK)
1165             pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1166
1167     }
1168
1169     pa_pstream_send_tagstruct(s->context->pstream, t);
1170     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1171
1172     pa_stream_set_state(s, PA_STREAM_CREATING);
1173
1174     pa_stream_unref(s);
1175     return 0;
1176 }
1177
1178 int pa_stream_connect_playback(
1179         pa_stream *s,
1180         const char *dev,
1181         const pa_buffer_attr *attr,
1182         pa_stream_flags_t flags,
1183         const pa_cvolume *volume,
1184         pa_stream *sync_stream) {
1185
1186     pa_assert(s);
1187     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1188
1189     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1190 }
1191
1192 int pa_stream_connect_record(
1193         pa_stream *s,
1194         const char *dev,
1195         const pa_buffer_attr *attr,
1196         pa_stream_flags_t flags) {
1197
1198     pa_assert(s);
1199     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1200
1201     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1202 }
1203
1204 int pa_stream_begin_write(
1205         pa_stream *s,
1206         void **data,
1207         size_t *nbytes) {
1208
1209     pa_assert(s);
1210     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1211
1212     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1213     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1214     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1215     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1216     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1217
1218     if (*nbytes != (size_t) -1) {
1219         size_t m, fs;
1220
1221         m = pa_mempool_block_size_max(s->context->mempool);
1222         fs = pa_frame_size(&s->sample_spec);
1223
1224         m = (m / fs) * fs;
1225         if (*nbytes > m)
1226             *nbytes = m;
1227     }
1228
1229     if (!s->write_memblock) {
1230         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1231         s->write_data = pa_memblock_acquire(s->write_memblock);
1232     }
1233
1234     *data = s->write_data;
1235     *nbytes = pa_memblock_get_length(s->write_memblock);
1236
1237     return 0;
1238 }
1239
1240 int pa_stream_cancel_write(
1241         pa_stream *s) {
1242
1243     pa_assert(s);
1244     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1245
1246     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1247     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1248     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1249     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1250
1251     pa_assert(s->write_data);
1252
1253     pa_memblock_release(s->write_memblock);
1254     pa_memblock_unref(s->write_memblock);
1255     s->write_memblock = NULL;
1256     s->write_data = NULL;
1257
1258     return 0;
1259 }
1260
1261 int pa_stream_write(
1262         pa_stream *s,
1263         const void *data,
1264         size_t length,
1265         pa_free_cb_t free_cb,
1266         int64_t offset,
1267         pa_seek_mode_t seek) {
1268
1269     pa_assert(s);
1270     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1271     pa_assert(data);
1272
1273     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1274     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1275     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1276     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1277     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1278     PA_CHECK_VALIDITY(s->context,
1279                       !s->write_memblock ||
1280                       ((data >= s->write_data) &&
1281                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1282                       PA_ERR_INVALID);
1283     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1284
1285     if (s->write_memblock) {
1286         pa_memchunk chunk;
1287
1288         /* pa_stream_write_begin() was called before */
1289
1290         pa_memblock_release(s->write_memblock);
1291
1292         chunk.memblock = s->write_memblock;
1293         chunk.index = (const char *) data - (const char *) s->write_data;
1294         chunk.length = length;
1295
1296         s->write_memblock = NULL;
1297         s->write_data = NULL;
1298
1299         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1300         pa_memblock_unref(chunk.memblock);
1301
1302     } else {
1303         pa_seek_mode_t t_seek = seek;
1304         int64_t t_offset = offset;
1305         size_t t_length = length;
1306         const void *t_data = data;
1307
1308         /* pa_stream_write_begin() was not called before */
1309
1310         while (t_length > 0) {
1311             pa_memchunk chunk;
1312
1313             chunk.index = 0;
1314
1315             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1316                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1317                 chunk.length = t_length;
1318             } else {
1319                 void *d;
1320
1321                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1322                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1323
1324                 d = pa_memblock_acquire(chunk.memblock);
1325                 memcpy(d, t_data, chunk.length);
1326                 pa_memblock_release(chunk.memblock);
1327             }
1328
1329             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1330
1331             t_offset = 0;
1332             t_seek = PA_SEEK_RELATIVE;
1333
1334             t_data = (const uint8_t*) t_data + chunk.length;
1335             t_length -= chunk.length;
1336
1337             pa_memblock_unref(chunk.memblock);
1338         }
1339
1340         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1341             free_cb((void*) data);
1342     }
1343
1344     /* This is obviously wrong since we ignore the seeking index . But
1345      * that's OK, the server side applies the same error */
1346     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1347
1348     if (s->direction == PA_STREAM_PLAYBACK) {
1349
1350         /* Update latency request correction */
1351         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1352
1353             if (seek == PA_SEEK_ABSOLUTE) {
1354                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1355                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1356                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1357             } else if (seek == PA_SEEK_RELATIVE) {
1358                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1359                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1360             } else
1361                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1362         }
1363
1364         /* Update the write index in the already available latency data */
1365         if (s->timing_info_valid) {
1366
1367             if (seek == PA_SEEK_ABSOLUTE) {
1368                 s->timing_info.write_index_corrupt = FALSE;
1369                 s->timing_info.write_index = offset + (int64_t) length;
1370             } else if (seek == PA_SEEK_RELATIVE) {
1371                 if (!s->timing_info.write_index_corrupt)
1372                     s->timing_info.write_index += offset + (int64_t) length;
1373             } else
1374                 s->timing_info.write_index_corrupt = TRUE;
1375         }
1376
1377         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1378             request_auto_timing_update(s, TRUE);
1379     }
1380
1381     return 0;
1382 }
1383
1384 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1385     pa_assert(s);
1386     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1387     pa_assert(data);
1388     pa_assert(length);
1389
1390     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1391     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1392     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1393
1394     if (!s->peek_memchunk.memblock) {
1395
1396         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1397             *data = NULL;
1398             *length = 0;
1399             return 0;
1400         }
1401
1402         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1403     }
1404
1405     pa_assert(s->peek_data);
1406     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1407     *length = s->peek_memchunk.length;
1408     return 0;
1409 }
1410
1411 int pa_stream_drop(pa_stream *s) {
1412     pa_assert(s);
1413     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1414
1415     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1416     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1417     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1418     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1419
1420     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1421
1422     /* Fix the simulated local read index */
1423     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1424         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1425
1426     pa_assert(s->peek_data);
1427     pa_memblock_release(s->peek_memchunk.memblock);
1428     pa_memblock_unref(s->peek_memchunk.memblock);
1429     pa_memchunk_reset(&s->peek_memchunk);
1430
1431     return 0;
1432 }
1433
1434 size_t pa_stream_writable_size(pa_stream *s) {
1435     pa_assert(s);
1436     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1437
1438     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1439     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1440     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1441
1442     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1443 }
1444
1445 size_t pa_stream_readable_size(pa_stream *s) {
1446     pa_assert(s);
1447     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1448
1449     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1450     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1451     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1452
1453     return pa_memblockq_get_length(s->record_memblockq);
1454 }
1455
1456 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1457     pa_operation *o;
1458     pa_tagstruct *t;
1459     uint32_t tag;
1460
1461     pa_assert(s);
1462     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1463
1464     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1465     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1466     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1467
1468     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1469
1470     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1471     pa_tagstruct_putu32(t, s->channel);
1472     pa_pstream_send_tagstruct(s->context->pstream, t);
1473     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1474
1475     return o;
1476 }
1477
1478 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1479     pa_usec_t usec;
1480
1481     pa_assert(s);
1482     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1483     pa_assert(s->state == PA_STREAM_READY);
1484     pa_assert(s->direction != PA_STREAM_UPLOAD);
1485     pa_assert(s->timing_info_valid);
1486     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1487     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1488
1489     if (s->direction == PA_STREAM_PLAYBACK) {
1490         /* The last byte that was written into the output device
1491          * had this time value associated */
1492         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1493
1494         if (!s->corked && !s->suspended) {
1495
1496             if (!ignore_transport)
1497                 /* Because the latency info took a little time to come
1498                  * to us, we assume that the real output time is actually
1499                  * a little ahead */
1500                 usec += s->timing_info.transport_usec;
1501
1502             /* However, the output device usually maintains a buffer
1503                too, hence the real sample currently played is a little
1504                back  */
1505             if (s->timing_info.sink_usec >= usec)
1506                 usec = 0;
1507             else
1508                 usec -= s->timing_info.sink_usec;
1509         }
1510
1511     } else {
1512         pa_assert(s->direction == PA_STREAM_RECORD);
1513
1514         /* The last byte written into the server side queue had
1515          * this time value associated */
1516         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1517
1518         if (!s->corked && !s->suspended) {
1519
1520             if (!ignore_transport)
1521                 /* Add transport latency */
1522                 usec += s->timing_info.transport_usec;
1523
1524             /* Add latency of data in device buffer */
1525             usec += s->timing_info.source_usec;
1526
1527             /* If this is a monitor source, we need to correct the
1528              * time by the playback device buffer */
1529             if (s->timing_info.sink_usec >= usec)
1530                 usec = 0;
1531             else
1532                 usec -= s->timing_info.sink_usec;
1533         }
1534     }
1535
1536     return usec;
1537 }
1538
1539 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1540     pa_operation *o = userdata;
1541     struct timeval local, remote, now;
1542     pa_timing_info *i;
1543     pa_bool_t playing = FALSE;
1544     uint64_t underrun_for = 0, playing_for = 0;
1545
1546     pa_assert(pd);
1547     pa_assert(o);
1548     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1549
1550     if (!o->context || !o->stream)
1551         goto finish;
1552
1553     i = &o->stream->timing_info;
1554
1555     o->stream->timing_info_valid = FALSE;
1556     i->write_index_corrupt = TRUE;
1557     i->read_index_corrupt = TRUE;
1558
1559     if (command != PA_COMMAND_REPLY) {
1560         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1561             goto finish;
1562
1563     } else {
1564
1565         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1566             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1567             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1568             pa_tagstruct_get_timeval(t, &local) < 0 ||
1569             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1570             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1571             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1572
1573             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1574             goto finish;
1575         }
1576
1577         if (o->context->version >= 13 &&
1578             o->stream->direction == PA_STREAM_PLAYBACK)
1579             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1580                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1581
1582                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1583                 goto finish;
1584             }
1585
1586
1587         if (!pa_tagstruct_eof(t)) {
1588             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1589             goto finish;
1590         }
1591         o->stream->timing_info_valid = TRUE;
1592         i->write_index_corrupt = FALSE;
1593         i->read_index_corrupt = FALSE;
1594
1595         i->playing = (int) playing;
1596         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1597
1598         pa_gettimeofday(&now);
1599
1600         /* Calculcate timestamps */
1601         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1602             /* local and remote seem to have synchronized clocks */
1603
1604             if (o->stream->direction == PA_STREAM_PLAYBACK)
1605                 i->transport_usec = pa_timeval_diff(&remote, &local);
1606             else
1607                 i->transport_usec = pa_timeval_diff(&now, &remote);
1608
1609             i->synchronized_clocks = TRUE;
1610             i->timestamp = remote;
1611         } else {
1612             /* clocks are not synchronized, let's estimate latency then */
1613             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1614             i->synchronized_clocks = FALSE;
1615             i->timestamp = local;
1616             pa_timeval_add(&i->timestamp, i->transport_usec);
1617         }
1618
1619         /* Invalidate read and write indexes if necessary */
1620         if (tag < o->stream->read_index_not_before)
1621             i->read_index_corrupt = TRUE;
1622
1623         if (tag < o->stream->write_index_not_before)
1624             i->write_index_corrupt = TRUE;
1625
1626         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1627             /* Write index correction */
1628
1629             int n, j;
1630             uint32_t ctag = tag;
1631
1632             /* Go through the saved correction values and add up the
1633              * total correction.*/
1634             for (n = 0, j = o->stream->current_write_index_correction+1;
1635                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1636                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1637
1638                 /* Step over invalid data or out-of-date data */
1639                 if (!o->stream->write_index_corrections[j].valid ||
1640                     o->stream->write_index_corrections[j].tag < ctag)
1641                     continue;
1642
1643                 /* Make sure that everything is in order */
1644                 ctag = o->stream->write_index_corrections[j].tag+1;
1645
1646                 /* Now fix the write index */
1647                 if (o->stream->write_index_corrections[j].corrupt) {
1648                     /* A corrupting seek was made */
1649                     i->write_index_corrupt = TRUE;
1650                 } else if (o->stream->write_index_corrections[j].absolute) {
1651                     /* An absolute seek was made */
1652                     i->write_index = o->stream->write_index_corrections[j].value;
1653                     i->write_index_corrupt = FALSE;
1654                 } else if (!i->write_index_corrupt) {
1655                     /* A relative seek was made */
1656                     i->write_index += o->stream->write_index_corrections[j].value;
1657                 }
1658             }
1659
1660             /* Clear old correction entries */
1661             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1662                 if (!o->stream->write_index_corrections[n].valid)
1663                     continue;
1664
1665                 if (o->stream->write_index_corrections[n].tag <= tag)
1666                     o->stream->write_index_corrections[n].valid = FALSE;
1667             }
1668         }
1669
1670         if (o->stream->direction == PA_STREAM_RECORD) {
1671             /* Read index correction */
1672
1673             if (!i->read_index_corrupt)
1674                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1675         }
1676
1677         /* Update smoother */
1678         if (o->stream->smoother) {
1679             pa_usec_t u, x;
1680
1681             u = x = pa_rtclock_now() - i->transport_usec;
1682
1683             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1684                 pa_usec_t su;
1685
1686                 /* If we weren't playing then it will take some time
1687                  * until the audio will actually come out through the
1688                  * speakers. Since we follow that timing here, we need
1689                  * to try to fix this up */
1690
1691                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1692
1693                 if (su < i->sink_usec)
1694                     x += i->sink_usec - su;
1695             }
1696
1697             if (!i->playing)
1698                 pa_smoother_pause(o->stream->smoother, x);
1699
1700             /* Update the smoother */
1701             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1702                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1703                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1704
1705             if (i->playing)
1706                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1707         }
1708     }
1709
1710     o->stream->auto_timing_update_requested = FALSE;
1711
1712     if (o->stream->latency_update_callback)
1713         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1714
1715     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1716         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1717         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1718     }
1719
1720 finish:
1721
1722     pa_operation_done(o);
1723     pa_operation_unref(o);
1724 }
1725
1726 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1727     uint32_t tag;
1728     pa_operation *o;
1729     pa_tagstruct *t;
1730     struct timeval now;
1731     int cidx = 0;
1732
1733     pa_assert(s);
1734     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1735
1736     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1737     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1738     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1739
1740     if (s->direction == PA_STREAM_PLAYBACK) {
1741         /* Find a place to store the write_index correction data for this entry */
1742         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1743
1744         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1745         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1746     }
1747     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1748
1749     t = pa_tagstruct_command(
1750             s->context,
1751             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1752             &tag);
1753     pa_tagstruct_putu32(t, s->channel);
1754     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1755
1756     pa_pstream_send_tagstruct(s->context->pstream, t);
1757     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1758
1759     if (s->direction == PA_STREAM_PLAYBACK) {
1760         /* Fill in initial correction data */
1761
1762         s->current_write_index_correction = cidx;
1763
1764         s->write_index_corrections[cidx].valid = TRUE;
1765         s->write_index_corrections[cidx].absolute = FALSE;
1766         s->write_index_corrections[cidx].corrupt = FALSE;
1767         s->write_index_corrections[cidx].tag = tag;
1768         s->write_index_corrections[cidx].value = 0;
1769     }
1770
1771     return o;
1772 }
1773
1774 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1775     pa_stream *s = userdata;
1776
1777     pa_assert(pd);
1778     pa_assert(s);
1779     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1780
1781     pa_stream_ref(s);
1782
1783     if (command != PA_COMMAND_REPLY) {
1784         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1785             goto finish;
1786
1787         pa_stream_set_state(s, PA_STREAM_FAILED);
1788         goto finish;
1789     } else if (!pa_tagstruct_eof(t)) {
1790         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1791         goto finish;
1792     }
1793
1794     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1795
1796 finish:
1797     pa_stream_unref(s);
1798 }
1799
1800 int pa_stream_disconnect(pa_stream *s) {
1801     pa_tagstruct *t;
1802     uint32_t tag;
1803
1804     pa_assert(s);
1805     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1806
1807     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1808     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1809     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1810
1811     pa_stream_ref(s);
1812
1813     t = pa_tagstruct_command(
1814             s->context,
1815             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1816                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1817             &tag);
1818     pa_tagstruct_putu32(t, s->channel);
1819     pa_pstream_send_tagstruct(s->context->pstream, t);
1820     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1821
1822     pa_stream_unref(s);
1823     return 0;
1824 }
1825
1826 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1827     pa_assert(s);
1828     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1829
1830     if (pa_detect_fork())
1831         return;
1832
1833     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1834         return;
1835
1836     s->read_callback = cb;
1837     s->read_userdata = userdata;
1838 }
1839
1840 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1841     pa_assert(s);
1842     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1843
1844     if (pa_detect_fork())
1845         return;
1846
1847     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1848         return;
1849
1850     s->write_callback = cb;
1851     s->write_userdata = userdata;
1852 }
1853
1854 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1855     pa_assert(s);
1856     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1857
1858     if (pa_detect_fork())
1859         return;
1860
1861     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1862         return;
1863
1864     s->state_callback = cb;
1865     s->state_userdata = userdata;
1866 }
1867
1868 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1869     pa_assert(s);
1870     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1871
1872     if (pa_detect_fork())
1873         return;
1874
1875     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1876         return;
1877
1878     s->overflow_callback = cb;
1879     s->overflow_userdata = userdata;
1880 }
1881
1882 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1883     pa_assert(s);
1884     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1885
1886     if (pa_detect_fork())
1887         return;
1888
1889     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1890         return;
1891
1892     s->underflow_callback = cb;
1893     s->underflow_userdata = userdata;
1894 }
1895
1896 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1897     pa_assert(s);
1898     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1899
1900     if (pa_detect_fork())
1901         return;
1902
1903     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1904         return;
1905
1906     s->latency_update_callback = cb;
1907     s->latency_update_userdata = userdata;
1908 }
1909
1910 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1911     pa_assert(s);
1912     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1913
1914     if (pa_detect_fork())
1915         return;
1916
1917     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1918         return;
1919
1920     s->moved_callback = cb;
1921     s->moved_userdata = userdata;
1922 }
1923
1924 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1925     pa_assert(s);
1926     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1927
1928     if (pa_detect_fork())
1929         return;
1930
1931     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1932         return;
1933
1934     s->suspended_callback = cb;
1935     s->suspended_userdata = userdata;
1936 }
1937
1938 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1939     pa_assert(s);
1940     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1941
1942     if (pa_detect_fork())
1943         return;
1944
1945     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1946         return;
1947
1948     s->started_callback = cb;
1949     s->started_userdata = userdata;
1950 }
1951
1952 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1953     pa_assert(s);
1954     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1955
1956     if (pa_detect_fork())
1957         return;
1958
1959     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1960         return;
1961
1962     s->event_callback = cb;
1963     s->event_userdata = userdata;
1964 }
1965
1966 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1967     pa_assert(s);
1968     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1969
1970     if (pa_detect_fork())
1971         return;
1972
1973     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1974         return;
1975
1976     s->buffer_attr_callback = cb;
1977     s->buffer_attr_userdata = userdata;
1978 }
1979
1980 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1981     pa_operation *o = userdata;
1982     int success = 1;
1983
1984     pa_assert(pd);
1985     pa_assert(o);
1986     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1987
1988     if (!o->context)
1989         goto finish;
1990
1991     if (command != PA_COMMAND_REPLY) {
1992         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1993             goto finish;
1994
1995         success = 0;
1996     } else if (!pa_tagstruct_eof(t)) {
1997         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1998         goto finish;
1999     }
2000
2001     if (o->callback) {
2002         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2003         cb(o->stream, success, o->userdata);
2004     }
2005
2006 finish:
2007     pa_operation_done(o);
2008     pa_operation_unref(o);
2009 }
2010
2011 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2012     pa_operation *o;
2013     pa_tagstruct *t;
2014     uint32_t tag;
2015
2016     pa_assert(s);
2017     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2018
2019     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2020     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2021     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2022
2023     s->corked = b;
2024
2025     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2026
2027     t = pa_tagstruct_command(
2028             s->context,
2029             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2030             &tag);
2031     pa_tagstruct_putu32(t, s->channel);
2032     pa_tagstruct_put_boolean(t, !!b);
2033     pa_pstream_send_tagstruct(s->context->pstream, t);
2034     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2035
2036     check_smoother_status(s, FALSE, FALSE, FALSE);
2037
2038     /* This might cause the indexes to hang/start again, hence
2039      * let's request a timing update */
2040     request_auto_timing_update(s, TRUE);
2041
2042     return o;
2043 }
2044
2045 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2046     pa_tagstruct *t;
2047     pa_operation *o;
2048     uint32_t tag;
2049
2050     pa_assert(s);
2051     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2052
2053     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2054     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2055
2056     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2057
2058     t = pa_tagstruct_command(s->context, command, &tag);
2059     pa_tagstruct_putu32(t, s->channel);
2060     pa_pstream_send_tagstruct(s->context->pstream, t);
2061     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2062
2063     return o;
2064 }
2065
2066 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2067     pa_operation *o;
2068
2069     pa_assert(s);
2070     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2071
2072     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2073     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2074     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2075
2076     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2077         return NULL;
2078
2079     if (s->direction == PA_STREAM_PLAYBACK) {
2080
2081         if (s->write_index_corrections[s->current_write_index_correction].valid)
2082             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2083
2084         if (s->buffer_attr.prebuf > 0)
2085             check_smoother_status(s, FALSE, FALSE, TRUE);
2086
2087         /* This will change the write index, but leave the
2088          * read index untouched. */
2089         invalidate_indexes(s, FALSE, TRUE);
2090
2091     } else
2092         /* For record streams this has no influence on the write
2093          * index, but the read index might jump. */
2094         invalidate_indexes(s, TRUE, FALSE);
2095
2096     return o;
2097 }
2098
2099 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2100     pa_operation *o;
2101
2102     pa_assert(s);
2103     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2104
2105     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2106     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2107     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2108     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2109
2110     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2111         return NULL;
2112
2113     /* This might cause the read index to hang again, hence
2114      * let's request a timing update */
2115     request_auto_timing_update(s, TRUE);
2116
2117     return o;
2118 }
2119
2120 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2121     pa_operation *o;
2122
2123     pa_assert(s);
2124     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2125
2126     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2127     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2128     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2129     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2130
2131     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2132         return NULL;
2133
2134     /* This might cause the read index to start moving again, hence
2135      * let's request a timing update */
2136     request_auto_timing_update(s, TRUE);
2137
2138     return o;
2139 }
2140
2141 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2142     pa_operation *o;
2143
2144     pa_assert(s);
2145     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2146     pa_assert(name);
2147
2148     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2149     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2150     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2151
2152     if (s->context->version >= 13) {
2153         pa_proplist *p = pa_proplist_new();
2154
2155         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2156         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2157         pa_proplist_free(p);
2158     } else {
2159         pa_tagstruct *t;
2160         uint32_t tag;
2161
2162         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2163         t = pa_tagstruct_command(
2164                 s->context,
2165                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2166                 &tag);
2167         pa_tagstruct_putu32(t, s->channel);
2168         pa_tagstruct_puts(t, name);
2169         pa_pstream_send_tagstruct(s->context->pstream, t);
2170         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2171     }
2172
2173     return o;
2174 }
2175
2176 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2177     pa_usec_t usec;
2178
2179     pa_assert(s);
2180     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2181
2182     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2183     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2184     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2185     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2186     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2187     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2188
2189     if (s->smoother)
2190         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2191     else
2192         usec = calc_time(s, FALSE);
2193
2194     /* Make sure the time runs monotonically */
2195     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2196         if (usec < s->previous_time)
2197             usec = s->previous_time;
2198         else
2199             s->previous_time = usec;
2200     }
2201
2202     if (r_usec)
2203         *r_usec = usec;
2204
2205     return 0;
2206 }
2207
2208 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2209     pa_assert(s);
2210     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2211
2212     if (negative)
2213         *negative = 0;
2214
2215     if (a >= b)
2216         return a-b;
2217     else {
2218         if (negative && s->direction == PA_STREAM_RECORD) {
2219             *negative = 1;
2220             return b-a;
2221         } else
2222             return 0;
2223     }
2224 }
2225
2226 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2227     pa_usec_t t, c;
2228     int r;
2229     int64_t cindex;
2230
2231     pa_assert(s);
2232     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2233     pa_assert(r_usec);
2234
2235     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2236     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2237     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2238     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2239     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2240     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2241
2242     if ((r = pa_stream_get_time(s, &t)) < 0)
2243         return r;
2244
2245     if (s->direction == PA_STREAM_PLAYBACK)
2246         cindex = s->timing_info.write_index;
2247     else
2248         cindex = s->timing_info.read_index;
2249
2250     if (cindex < 0)
2251         cindex = 0;
2252
2253     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2254
2255     if (s->direction == PA_STREAM_PLAYBACK)
2256         *r_usec = time_counter_diff(s, c, t, negative);
2257     else
2258         *r_usec = time_counter_diff(s, t, c, negative);
2259
2260     return 0;
2261 }
2262
2263 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2264     pa_assert(s);
2265     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2266
2267     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2268     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2269     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2270     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2271
2272     return &s->timing_info;
2273 }
2274
2275 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2276     pa_assert(s);
2277     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2278
2279     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2280
2281     return &s->sample_spec;
2282 }
2283
2284 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2285     pa_assert(s);
2286     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2287
2288     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2289
2290     return &s->channel_map;
2291 }
2292
2293 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2294     pa_assert(s);
2295     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2296
2297     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2298     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2299     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2300
2301     return &s->buffer_attr;
2302 }
2303
2304 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2305     pa_operation *o = userdata;
2306     int success = 1;
2307
2308     pa_assert(pd);
2309     pa_assert(o);
2310     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2311
2312     if (!o->context)
2313         goto finish;
2314
2315     if (command != PA_COMMAND_REPLY) {
2316         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2317             goto finish;
2318
2319         success = 0;
2320     } else {
2321         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2322             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2323                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2324                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2325                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2326                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2327                 goto finish;
2328             }
2329         } else if (o->stream->direction == PA_STREAM_RECORD) {
2330             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2331                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2332                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2333                 goto finish;
2334             }
2335         }
2336
2337         if (o->stream->context->version >= 13) {
2338             pa_usec_t usec;
2339
2340             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2341                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2342                 goto finish;
2343             }
2344
2345             if (o->stream->direction == PA_STREAM_RECORD)
2346                 o->stream->timing_info.configured_source_usec = usec;
2347             else
2348                 o->stream->timing_info.configured_sink_usec = usec;
2349         }
2350
2351         if (!pa_tagstruct_eof(t)) {
2352             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2353             goto finish;
2354         }
2355     }
2356
2357     if (o->callback) {
2358         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2359         cb(o->stream, success, o->userdata);
2360     }
2361
2362 finish:
2363     pa_operation_done(o);
2364     pa_operation_unref(o);
2365 }
2366
2367
2368 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2369     pa_operation *o;
2370     pa_tagstruct *t;
2371     uint32_t tag;
2372
2373     pa_assert(s);
2374     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2375     pa_assert(attr);
2376
2377     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2378     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2379     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2380     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2381
2382     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2383
2384     t = pa_tagstruct_command(
2385             s->context,
2386             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2387             &tag);
2388     pa_tagstruct_putu32(t, s->channel);
2389
2390     pa_tagstruct_putu32(t, attr->maxlength);
2391
2392     if (s->direction == PA_STREAM_PLAYBACK)
2393         pa_tagstruct_put(
2394                 t,
2395                 PA_TAG_U32, attr->tlength,
2396                 PA_TAG_U32, attr->prebuf,
2397                 PA_TAG_U32, attr->minreq,
2398                 PA_TAG_INVALID);
2399     else
2400         pa_tagstruct_putu32(t, attr->fragsize);
2401
2402     if (s->context->version >= 13)
2403         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2404
2405     if (s->context->version >= 14)
2406         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2407
2408     pa_pstream_send_tagstruct(s->context->pstream, t);
2409     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2410
2411     /* This might cause changes in the read/write indexex, hence let's
2412      * request a timing update */
2413     request_auto_timing_update(s, TRUE);
2414
2415     return o;
2416 }
2417
2418 uint32_t pa_stream_get_device_index(pa_stream *s) {
2419     pa_assert(s);
2420     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2421
2422     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2423     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2424     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2425     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2426     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2427
2428     return s->device_index;
2429 }
2430
2431 const char *pa_stream_get_device_name(pa_stream *s) {
2432     pa_assert(s);
2433     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2434
2435     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2436     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2437     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2438     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2439     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2440
2441     return s->device_name;
2442 }
2443
2444 int pa_stream_is_suspended(pa_stream *s) {
2445     pa_assert(s);
2446     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2447
2448     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2449     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2450     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2451     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2452
2453     return s->suspended;
2454 }
2455
2456 int pa_stream_is_corked(pa_stream *s) {
2457     pa_assert(s);
2458     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2459
2460     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2461     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2462     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2463
2464     return s->corked;
2465 }
2466
2467 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2468     pa_operation *o = userdata;
2469     int success = 1;
2470
2471     pa_assert(pd);
2472     pa_assert(o);
2473     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2474
2475     if (!o->context)
2476         goto finish;
2477
2478     if (command != PA_COMMAND_REPLY) {
2479         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2480             goto finish;
2481
2482         success = 0;
2483     } else {
2484
2485         if (!pa_tagstruct_eof(t)) {
2486             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2487             goto finish;
2488         }
2489     }
2490
2491     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2492     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2493
2494     if (o->callback) {
2495         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2496         cb(o->stream, success, o->userdata);
2497     }
2498
2499 finish:
2500     pa_operation_done(o);
2501     pa_operation_unref(o);
2502 }
2503
2504
2505 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2506     pa_operation *o;
2507     pa_tagstruct *t;
2508     uint32_t tag;
2509
2510     pa_assert(s);
2511     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2512
2513     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2514     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2515     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2516     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2517     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2518     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2519
2520     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2521     o->private = PA_UINT_TO_PTR(rate);
2522
2523     t = pa_tagstruct_command(
2524             s->context,
2525             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2526             &tag);
2527     pa_tagstruct_putu32(t, s->channel);
2528     pa_tagstruct_putu32(t, rate);
2529
2530     pa_pstream_send_tagstruct(s->context->pstream, t);
2531     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2532
2533     return o;
2534 }
2535
2536 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2537     pa_operation *o;
2538     pa_tagstruct *t;
2539     uint32_t tag;
2540
2541     pa_assert(s);
2542     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2543
2544     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2545     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2546     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2547     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2548     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2549
2550     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2551
2552     t = pa_tagstruct_command(
2553             s->context,
2554             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2555             &tag);
2556     pa_tagstruct_putu32(t, s->channel);
2557     pa_tagstruct_putu32(t, (uint32_t) mode);
2558     pa_tagstruct_put_proplist(t, p);
2559
2560     pa_pstream_send_tagstruct(s->context->pstream, t);
2561     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2562
2563     /* Please note that we don't update s->proplist here, because we
2564      * don't export that field */
2565
2566     return o;
2567 }
2568
2569 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2570     pa_operation *o;
2571     pa_tagstruct *t;
2572     uint32_t tag;
2573     const char * const*k;
2574
2575     pa_assert(s);
2576     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2577
2578     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2579     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2580     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2581     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2582     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2583
2584     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2585
2586     t = pa_tagstruct_command(
2587             s->context,
2588             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2589             &tag);
2590     pa_tagstruct_putu32(t, s->channel);
2591
2592     for (k = keys; *k; k++)
2593         pa_tagstruct_puts(t, *k);
2594
2595     pa_tagstruct_puts(t, NULL);
2596
2597     pa_pstream_send_tagstruct(s->context->pstream, t);
2598     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2599
2600     /* Please note that we don't update s->proplist here, because we
2601      * don't export that field */
2602
2603     return o;
2604 }
2605
2606 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2607     pa_assert(s);
2608     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2609
2610     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2611     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2612     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2613     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2614
2615     s->direct_on_input = sink_input_idx;
2616
2617     return 0;
2618 }
2619
2620 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2621     pa_assert(s);
2622     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2623
2624     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2625     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2626     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2627
2628     return s->direct_on_input;
2629 }