perl -p -i -e 's/pa_rtclock_usec/pa_rtclock_now/g' `find . -name '*.[ch]'`
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "fork-detect.h"
42 #include "internal.h"
43
44 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
45 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
46
47 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_MIN_HISTORY (4)
50
51 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
52     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
53 }
54
55 static void reset_callbacks(pa_stream *s) {
56     s->read_callback = NULL;
57     s->read_userdata = NULL;
58     s->write_callback = NULL;
59     s->write_userdata = NULL;
60     s->state_callback = NULL;
61     s->state_userdata = NULL;
62     s->overflow_callback = NULL;
63     s->overflow_userdata = NULL;
64     s->underflow_callback = NULL;
65     s->underflow_userdata = NULL;
66     s->latency_update_callback = NULL;
67     s->latency_update_userdata = NULL;
68     s->moved_callback = NULL;
69     s->moved_userdata = NULL;
70     s->suspended_callback = NULL;
71     s->suspended_userdata = NULL;
72     s->started_callback = NULL;
73     s->started_userdata = NULL;
74     s->event_callback = NULL;
75     s->event_userdata = NULL;
76     s->buffer_attr_callback = NULL;
77     s->buffer_attr_userdata = NULL;
78 }
79
80 pa_stream *pa_stream_new_with_proplist(
81         pa_context *c,
82         const char *name,
83         const pa_sample_spec *ss,
84         const pa_channel_map *map,
85         pa_proplist *p) {
86
87     pa_stream *s;
88     int i;
89     pa_channel_map tmap;
90
91     pa_assert(c);
92     pa_assert(PA_REFCNT_VALUE(c) >= 1);
93
94     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
95     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
101
102     if (!map)
103         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
104
105     s = pa_xnew(pa_stream, 1);
106     PA_REFCNT_INIT(s);
107     s->context = c;
108     s->mainloop = c->mainloop;
109
110     s->direction = PA_STREAM_NODIRECTION;
111     s->state = PA_STREAM_UNCONNECTED;
112     s->flags = 0;
113
114     s->sample_spec = *ss;
115     s->channel_map = *map;
116
117     s->direct_on_input = PA_INVALID_INDEX;
118
119     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
120     if (name)
121         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
122
123     s->channel = 0;
124     s->channel_valid = FALSE;
125     s->syncid = c->csyncid++;
126     s->stream_index = PA_INVALID_INDEX;
127
128     s->requested_bytes = 0;
129     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
130
131     /* We initialize der target length here, so that if the user
132      * passes no explicit buffering metrics the default is similar to
133      * what older PA versions provided. */
134
135     s->buffer_attr.maxlength = (uint32_t) -1;
136     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
137     s->buffer_attr.minreq = (uint32_t) -1;
138     s->buffer_attr.prebuf = (uint32_t) -1;
139     s->buffer_attr.fragsize = (uint32_t) -1;
140
141     s->device_index = PA_INVALID_INDEX;
142     s->device_name = NULL;
143     s->suspended = FALSE;
144     s->corked = FALSE;
145
146     pa_memchunk_reset(&s->peek_memchunk);
147     s->peek_data = NULL;
148
149     s->record_memblockq = NULL;
150
151
152     memset(&s->timing_info, 0, sizeof(s->timing_info));
153     s->timing_info_valid = FALSE;
154
155     s->previous_time = 0;
156
157     s->read_index_not_before = 0;
158     s->write_index_not_before = 0;
159     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
160         s->write_index_corrections[i].valid = 0;
161     s->current_write_index_correction = 0;
162
163     s->auto_timing_update_event = NULL;
164     s->auto_timing_update_requested = FALSE;
165     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
166
167     reset_callbacks(s);
168
169     s->smoother = NULL;
170
171     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
172     PA_LLIST_PREPEND(pa_stream, c->streams, s);
173     pa_stream_ref(s);
174
175     return s;
176 }
177
178 static void stream_unlink(pa_stream *s) {
179     pa_operation *o, *n;
180     pa_assert(s);
181
182     if (!s->context)
183         return;
184
185     /* Detach from context */
186
187     /* Unref all operatio object that point to us */
188     for (o = s->context->operations; o; o = n) {
189         n = o->next;
190
191         if (o->stream == s)
192             pa_operation_cancel(o);
193     }
194
195     /* Drop all outstanding replies for this stream */
196     if (s->context->pdispatch)
197         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
198
199     if (s->channel_valid) {
200         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
201         s->channel = 0;
202         s->channel_valid = FALSE;
203     }
204
205     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
206     pa_stream_unref(s);
207
208     s->context = NULL;
209
210     if (s->auto_timing_update_event) {
211         pa_assert(s->mainloop);
212         s->mainloop->time_free(s->auto_timing_update_event);
213     }
214
215     reset_callbacks(s);
216 }
217
218 static void stream_free(pa_stream *s) {
219     pa_assert(s);
220
221     stream_unlink(s);
222
223     if (s->peek_memchunk.memblock) {
224         if (s->peek_data)
225             pa_memblock_release(s->peek_memchunk.memblock);
226         pa_memblock_unref(s->peek_memchunk.memblock);
227     }
228
229     if (s->record_memblockq)
230         pa_memblockq_free(s->record_memblockq);
231
232     if (s->proplist)
233         pa_proplist_free(s->proplist);
234
235     if (s->smoother)
236         pa_smoother_free(s->smoother);
237
238     pa_xfree(s->device_name);
239     pa_xfree(s);
240 }
241
242 void pa_stream_unref(pa_stream *s) {
243     pa_assert(s);
244     pa_assert(PA_REFCNT_VALUE(s) >= 1);
245
246     if (PA_REFCNT_DEC(s) <= 0)
247         stream_free(s);
248 }
249
250 pa_stream* pa_stream_ref(pa_stream *s) {
251     pa_assert(s);
252     pa_assert(PA_REFCNT_VALUE(s) >= 1);
253
254     PA_REFCNT_INC(s);
255     return s;
256 }
257
258 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
259     pa_assert(s);
260     pa_assert(PA_REFCNT_VALUE(s) >= 1);
261
262     return s->state;
263 }
264
265 pa_context* pa_stream_get_context(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->context;
270 }
271
272 uint32_t pa_stream_get_index(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
277     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
278
279     return s->stream_index;
280 }
281
282 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
283     pa_assert(s);
284     pa_assert(PA_REFCNT_VALUE(s) >= 1);
285
286     if (s->state == st)
287         return;
288
289     pa_stream_ref(s);
290
291     s->state = st;
292
293     if (s->state_callback)
294         s->state_callback(s, s->state_userdata);
295
296     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
297         stream_unlink(s);
298
299     pa_stream_unref(s);
300 }
301
302 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
303     pa_assert(s);
304     pa_assert(PA_REFCNT_VALUE(s) >= 1);
305
306     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
307         return;
308
309     if (s->state == PA_STREAM_READY &&
310         (force || !s->auto_timing_update_requested)) {
311         pa_operation *o;
312
313 /*         pa_log("Automatically requesting new timing data"); */
314
315         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
316             pa_operation_unref(o);
317             s->auto_timing_update_requested = TRUE;
318         }
319     }
320
321     if (s->auto_timing_update_event) {
322         struct timeval next;
323
324         if (force)
325             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
326
327         pa_gettimeofday(&next);
328         pa_timeval_add(&next, s->auto_timing_interval_usec);
329         s->mainloop->time_restart(s->auto_timing_update_event, &next);
330
331         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
332     }
333 }
334
335 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
336     pa_context *c = userdata;
337     pa_stream *s;
338     uint32_t channel;
339
340     pa_assert(pd);
341     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
342     pa_assert(t);
343     pa_assert(c);
344     pa_assert(PA_REFCNT_VALUE(c) >= 1);
345
346     pa_context_ref(c);
347
348     if (pa_tagstruct_getu32(t, &channel) < 0 ||
349         !pa_tagstruct_eof(t)) {
350         pa_context_fail(c, PA_ERR_PROTOCOL);
351         goto finish;
352     }
353
354     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
355         goto finish;
356
357     if (s->state != PA_STREAM_READY)
358         goto finish;
359
360     pa_context_set_error(c, PA_ERR_KILLED);
361     pa_stream_set_state(s, PA_STREAM_FAILED);
362
363 finish:
364     pa_context_unref(c);
365 }
366
367 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
368     pa_usec_t x;
369
370     pa_assert(s);
371     pa_assert(!force_start || !force_stop);
372
373     if (!s->smoother)
374         return;
375
376     x = pa_rtclock_now();
377
378     if (s->timing_info_valid) {
379         if (aposteriori)
380             x -= s->timing_info.transport_usec;
381         else
382             x += s->timing_info.transport_usec;
383     }
384
385     if (s->suspended || s->corked || force_stop)
386         pa_smoother_pause(s->smoother, x);
387     else if (force_start || s->buffer_attr.prebuf == 0)
388         pa_smoother_resume(s->smoother, x, TRUE);
389
390
391     /* Please note that we have no idea if playback actually started
392      * if prebuf is non-zero! */
393 }
394
395 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
396     pa_context *c = userdata;
397     pa_stream *s;
398     uint32_t channel;
399     const char *dn;
400     pa_bool_t suspended;
401     uint32_t di;
402     pa_usec_t usec = 0;
403     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
404
405     pa_assert(pd);
406     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
407     pa_assert(t);
408     pa_assert(c);
409     pa_assert(PA_REFCNT_VALUE(c) >= 1);
410
411     pa_context_ref(c);
412
413     if (c->version < 12) {
414         pa_context_fail(c, PA_ERR_PROTOCOL);
415         goto finish;
416     }
417
418     if (pa_tagstruct_getu32(t, &channel) < 0 ||
419         pa_tagstruct_getu32(t, &di) < 0 ||
420         pa_tagstruct_gets(t, &dn) < 0 ||
421         pa_tagstruct_get_boolean(t, &suspended) < 0) {
422         pa_context_fail(c, PA_ERR_PROTOCOL);
423         goto finish;
424     }
425
426     if (c->version >= 13) {
427
428         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
429             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
430                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
431                 pa_tagstruct_get_usec(t, &usec) < 0) {
432                 pa_context_fail(c, PA_ERR_PROTOCOL);
433                 goto finish;
434             }
435         } else {
436             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
437                 pa_tagstruct_getu32(t, &tlength) < 0 ||
438                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
439                 pa_tagstruct_getu32(t, &minreq) < 0 ||
440                 pa_tagstruct_get_usec(t, &usec) < 0) {
441                 pa_context_fail(c, PA_ERR_PROTOCOL);
442                 goto finish;
443             }
444         }
445     }
446
447     if (!pa_tagstruct_eof(t)) {
448         pa_context_fail(c, PA_ERR_PROTOCOL);
449         goto finish;
450     }
451
452     if (!dn || di == PA_INVALID_INDEX) {
453         pa_context_fail(c, PA_ERR_PROTOCOL);
454         goto finish;
455     }
456
457     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
458         goto finish;
459
460     if (s->state != PA_STREAM_READY)
461         goto finish;
462
463     if (c->version >= 13) {
464         if (s->direction == PA_STREAM_RECORD)
465             s->timing_info.configured_source_usec = usec;
466         else
467             s->timing_info.configured_sink_usec = usec;
468
469         s->buffer_attr.maxlength = maxlength;
470         s->buffer_attr.fragsize = fragsize;
471         s->buffer_attr.tlength = tlength;
472         s->buffer_attr.prebuf = prebuf;
473         s->buffer_attr.minreq = minreq;
474     }
475
476     pa_xfree(s->device_name);
477     s->device_name = pa_xstrdup(dn);
478     s->device_index = di;
479
480     s->suspended = suspended;
481
482     check_smoother_status(s, TRUE, FALSE, FALSE);
483     request_auto_timing_update(s, TRUE);
484
485     if (s->moved_callback)
486         s->moved_callback(s, s->moved_userdata);
487
488 finish:
489     pa_context_unref(c);
490 }
491
492 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
493     pa_context *c = userdata;
494     pa_stream *s;
495     uint32_t channel;
496     pa_usec_t usec = 0;
497     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
498
499     pa_assert(pd);
500     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
501     pa_assert(t);
502     pa_assert(c);
503     pa_assert(PA_REFCNT_VALUE(c) >= 1);
504
505     pa_context_ref(c);
506
507     if (c->version < 15) {
508         pa_context_fail(c, PA_ERR_PROTOCOL);
509         goto finish;
510     }
511
512     if (pa_tagstruct_getu32(t, &channel) < 0) {
513         pa_context_fail(c, PA_ERR_PROTOCOL);
514         goto finish;
515     }
516
517     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
518         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
519             pa_tagstruct_getu32(t, &fragsize) < 0 ||
520             pa_tagstruct_get_usec(t, &usec) < 0) {
521             pa_context_fail(c, PA_ERR_PROTOCOL);
522             goto finish;
523         }
524     } else {
525         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
526             pa_tagstruct_getu32(t, &tlength) < 0 ||
527             pa_tagstruct_getu32(t, &prebuf) < 0 ||
528             pa_tagstruct_getu32(t, &minreq) < 0 ||
529             pa_tagstruct_get_usec(t, &usec) < 0) {
530             pa_context_fail(c, PA_ERR_PROTOCOL);
531             goto finish;
532         }
533     }
534
535     if (!pa_tagstruct_eof(t)) {
536         pa_context_fail(c, PA_ERR_PROTOCOL);
537         goto finish;
538     }
539
540     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
541         goto finish;
542
543     if (s->state != PA_STREAM_READY)
544         goto finish;
545
546     if (s->direction == PA_STREAM_RECORD)
547         s->timing_info.configured_source_usec = usec;
548     else
549         s->timing_info.configured_sink_usec = usec;
550
551     s->buffer_attr.maxlength = maxlength;
552     s->buffer_attr.fragsize = fragsize;
553     s->buffer_attr.tlength = tlength;
554     s->buffer_attr.prebuf = prebuf;
555     s->buffer_attr.minreq = minreq;
556
557     request_auto_timing_update(s, TRUE);
558
559     if (s->buffer_attr_callback)
560         s->buffer_attr_callback(s, s->buffer_attr_userdata);
561
562 finish:
563     pa_context_unref(c);
564 }
565
566 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
567     pa_context *c = userdata;
568     pa_stream *s;
569     uint32_t channel;
570     pa_bool_t suspended;
571
572     pa_assert(pd);
573     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
574     pa_assert(t);
575     pa_assert(c);
576     pa_assert(PA_REFCNT_VALUE(c) >= 1);
577
578     pa_context_ref(c);
579
580     if (c->version < 12) {
581         pa_context_fail(c, PA_ERR_PROTOCOL);
582         goto finish;
583     }
584
585     if (pa_tagstruct_getu32(t, &channel) < 0 ||
586         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
587         !pa_tagstruct_eof(t)) {
588         pa_context_fail(c, PA_ERR_PROTOCOL);
589         goto finish;
590     }
591
592     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
593         goto finish;
594
595     if (s->state != PA_STREAM_READY)
596         goto finish;
597
598     s->suspended = suspended;
599
600     check_smoother_status(s, TRUE, FALSE, FALSE);
601     request_auto_timing_update(s, TRUE);
602
603     if (s->suspended_callback)
604         s->suspended_callback(s, s->suspended_userdata);
605
606 finish:
607     pa_context_unref(c);
608 }
609
610 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
611     pa_context *c = userdata;
612     pa_stream *s;
613     uint32_t channel;
614
615     pa_assert(pd);
616     pa_assert(command == PA_COMMAND_STARTED);
617     pa_assert(t);
618     pa_assert(c);
619     pa_assert(PA_REFCNT_VALUE(c) >= 1);
620
621     pa_context_ref(c);
622
623     if (c->version < 13) {
624         pa_context_fail(c, PA_ERR_PROTOCOL);
625         goto finish;
626     }
627
628     if (pa_tagstruct_getu32(t, &channel) < 0 ||
629         !pa_tagstruct_eof(t)) {
630         pa_context_fail(c, PA_ERR_PROTOCOL);
631         goto finish;
632     }
633
634     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
635         goto finish;
636
637     if (s->state != PA_STREAM_READY)
638         goto finish;
639
640     check_smoother_status(s, TRUE, TRUE, FALSE);
641     request_auto_timing_update(s, TRUE);
642
643     if (s->started_callback)
644         s->started_callback(s, s->started_userdata);
645
646 finish:
647     pa_context_unref(c);
648 }
649
650 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
651     pa_context *c = userdata;
652     pa_stream *s;
653     uint32_t channel;
654     pa_proplist *pl = NULL;
655     const char *event;
656
657     pa_assert(pd);
658     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
659     pa_assert(t);
660     pa_assert(c);
661     pa_assert(PA_REFCNT_VALUE(c) >= 1);
662
663     pa_context_ref(c);
664
665     if (c->version < 15) {
666         pa_context_fail(c, PA_ERR_PROTOCOL);
667         goto finish;
668     }
669
670     pl = pa_proplist_new();
671
672     if (pa_tagstruct_getu32(t, &channel) < 0 ||
673         pa_tagstruct_gets(t, &event) < 0 ||
674         pa_tagstruct_get_proplist(t, pl) < 0 ||
675         !pa_tagstruct_eof(t) || !event) {
676         pa_context_fail(c, PA_ERR_PROTOCOL);
677         goto finish;
678     }
679
680     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
681         goto finish;
682
683     if (s->state != PA_STREAM_READY)
684         goto finish;
685
686     if (s->event_callback)
687         s->event_callback(s, event, pl, s->event_userdata);
688
689 finish:
690     pa_context_unref(c);
691
692     if (pl)
693         pa_proplist_free(pl);
694 }
695
696 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
697     pa_stream *s;
698     pa_context *c = userdata;
699     uint32_t bytes, channel;
700
701     pa_assert(pd);
702     pa_assert(command == PA_COMMAND_REQUEST);
703     pa_assert(t);
704     pa_assert(c);
705     pa_assert(PA_REFCNT_VALUE(c) >= 1);
706
707     pa_context_ref(c);
708
709     if (pa_tagstruct_getu32(t, &channel) < 0 ||
710         pa_tagstruct_getu32(t, &bytes) < 0 ||
711         !pa_tagstruct_eof(t)) {
712         pa_context_fail(c, PA_ERR_PROTOCOL);
713         goto finish;
714     }
715
716     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
717         goto finish;
718
719     if (s->state != PA_STREAM_READY)
720         goto finish;
721
722     s->requested_bytes += bytes;
723
724     if (s->requested_bytes > 0 && s->write_callback)
725         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
726
727 finish:
728     pa_context_unref(c);
729 }
730
731 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
732     pa_stream *s;
733     pa_context *c = userdata;
734     uint32_t channel;
735
736     pa_assert(pd);
737     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
738     pa_assert(t);
739     pa_assert(c);
740     pa_assert(PA_REFCNT_VALUE(c) >= 1);
741
742     pa_context_ref(c);
743
744     if (pa_tagstruct_getu32(t, &channel) < 0 ||
745         !pa_tagstruct_eof(t)) {
746         pa_context_fail(c, PA_ERR_PROTOCOL);
747         goto finish;
748     }
749
750     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
751         goto finish;
752
753     if (s->state != PA_STREAM_READY)
754         goto finish;
755
756     if (s->buffer_attr.prebuf > 0)
757         check_smoother_status(s, TRUE, FALSE, TRUE);
758
759     request_auto_timing_update(s, TRUE);
760
761     if (command == PA_COMMAND_OVERFLOW) {
762         if (s->overflow_callback)
763             s->overflow_callback(s, s->overflow_userdata);
764     } else if (command == PA_COMMAND_UNDERFLOW) {
765         if (s->underflow_callback)
766             s->underflow_callback(s, s->underflow_userdata);
767     }
768
769  finish:
770     pa_context_unref(c);
771 }
772
773 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
774     pa_assert(s);
775     pa_assert(PA_REFCNT_VALUE(s) >= 1);
776
777 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
778
779     if (s->state != PA_STREAM_READY)
780         return;
781
782     if (w) {
783         s->write_index_not_before = s->context->ctag;
784
785         if (s->timing_info_valid)
786             s->timing_info.write_index_corrupt = TRUE;
787
788 /*         pa_log("write_index invalidated"); */
789     }
790
791     if (r) {
792         s->read_index_not_before = s->context->ctag;
793
794         if (s->timing_info_valid)
795             s->timing_info.read_index_corrupt = TRUE;
796
797 /*         pa_log("read_index invalidated"); */
798     }
799
800     request_auto_timing_update(s, TRUE);
801 }
802
803 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
804     pa_stream *s = userdata;
805
806     pa_assert(s);
807     pa_assert(PA_REFCNT_VALUE(s) >= 1);
808
809     pa_stream_ref(s);
810     request_auto_timing_update(s, FALSE);
811     pa_stream_unref(s);
812 }
813
814 static void create_stream_complete(pa_stream *s) {
815     pa_assert(s);
816     pa_assert(PA_REFCNT_VALUE(s) >= 1);
817     pa_assert(s->state == PA_STREAM_CREATING);
818
819     pa_stream_set_state(s, PA_STREAM_READY);
820
821     if (s->requested_bytes > 0 && s->write_callback)
822         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
823
824     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
825         struct timeval tv;
826         pa_gettimeofday(&tv);
827         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
828         pa_timeval_add(&tv, s->auto_timing_interval_usec);
829         pa_assert(!s->auto_timing_update_event);
830         s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
831
832         request_auto_timing_update(s, TRUE);
833     }
834
835     check_smoother_status(s, TRUE, FALSE, FALSE);
836 }
837
838 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
839     pa_assert(s);
840     pa_assert(attr);
841     pa_assert(ss);
842
843     if (s->context->version >= 13)
844         return;
845
846     /* Version older than 0.9.10 didn't do server side buffer_attr
847      * selection, hence we have to fake it on the client side. */
848
849     /* We choose fairly conservative values here, to not confuse
850      * old clients with extremely large playback buffers */
851
852     if (attr->maxlength == (uint32_t) -1)
853         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
854
855     if (attr->tlength == (uint32_t) -1)
856         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
857
858     if (attr->minreq == (uint32_t) -1)
859         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
860
861     if (attr->prebuf == (uint32_t) -1)
862         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
863
864     if (attr->fragsize == (uint32_t) -1)
865         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
866 }
867
868 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
869     pa_stream *s = userdata;
870     uint32_t requested_bytes;
871
872     pa_assert(pd);
873     pa_assert(s);
874     pa_assert(PA_REFCNT_VALUE(s) >= 1);
875     pa_assert(s->state == PA_STREAM_CREATING);
876
877     pa_stream_ref(s);
878
879     if (command != PA_COMMAND_REPLY) {
880         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
881             goto finish;
882
883         pa_stream_set_state(s, PA_STREAM_FAILED);
884         goto finish;
885     }
886
887     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
888         s->channel == PA_INVALID_INDEX ||
889         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
890         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
891         pa_context_fail(s->context, PA_ERR_PROTOCOL);
892         goto finish;
893     }
894
895     s->requested_bytes = (int64_t) requested_bytes;
896
897     if (s->context->version >= 9) {
898         if (s->direction == PA_STREAM_PLAYBACK) {
899             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
900                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
901                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
902                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
903                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
904                 goto finish;
905             }
906         } else if (s->direction == PA_STREAM_RECORD) {
907             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
909                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
910                 goto finish;
911             }
912         }
913     }
914
915     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
916         pa_sample_spec ss;
917         pa_channel_map cm;
918         const char *dn = NULL;
919         pa_bool_t suspended;
920
921         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
922             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
923             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
924             pa_tagstruct_gets(t, &dn) < 0 ||
925             pa_tagstruct_get_boolean(t, &suspended) < 0) {
926             pa_context_fail(s->context, PA_ERR_PROTOCOL);
927             goto finish;
928         }
929
930         if (!dn || s->device_index == PA_INVALID_INDEX ||
931             ss.channels != cm.channels ||
932             !pa_channel_map_valid(&cm) ||
933             !pa_sample_spec_valid(&ss) ||
934             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
935             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
936             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
937             pa_context_fail(s->context, PA_ERR_PROTOCOL);
938             goto finish;
939         }
940
941         pa_xfree(s->device_name);
942         s->device_name = pa_xstrdup(dn);
943         s->suspended = suspended;
944
945         s->channel_map = cm;
946         s->sample_spec = ss;
947     }
948
949     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
950         pa_usec_t usec;
951
952         if (pa_tagstruct_get_usec(t, &usec) < 0) {
953             pa_context_fail(s->context, PA_ERR_PROTOCOL);
954             goto finish;
955         }
956
957         if (s->direction == PA_STREAM_RECORD)
958             s->timing_info.configured_source_usec = usec;
959         else
960             s->timing_info.configured_sink_usec = usec;
961     }
962
963     if (!pa_tagstruct_eof(t)) {
964         pa_context_fail(s->context, PA_ERR_PROTOCOL);
965         goto finish;
966     }
967
968     if (s->direction == PA_STREAM_RECORD) {
969         pa_assert(!s->record_memblockq);
970
971         s->record_memblockq = pa_memblockq_new(
972                 0,
973                 s->buffer_attr.maxlength,
974                 0,
975                 pa_frame_size(&s->sample_spec),
976                 1,
977                 0,
978                 0,
979                 NULL);
980     }
981
982     s->channel_valid = TRUE;
983     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
984
985     create_stream_complete(s);
986
987 finish:
988     pa_stream_unref(s);
989 }
990
991 static int create_stream(
992         pa_stream_direction_t direction,
993         pa_stream *s,
994         const char *dev,
995         const pa_buffer_attr *attr,
996         pa_stream_flags_t flags,
997         const pa_cvolume *volume,
998         pa_stream *sync_stream) {
999
1000     pa_tagstruct *t;
1001     uint32_t tag;
1002     pa_bool_t volume_set = FALSE;
1003
1004     pa_assert(s);
1005     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1007
1008     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1009     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1010     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1011     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1012                                               PA_STREAM_INTERPOLATE_TIMING|
1013                                               PA_STREAM_NOT_MONOTONIC|
1014                                               PA_STREAM_AUTO_TIMING_UPDATE|
1015                                               PA_STREAM_NO_REMAP_CHANNELS|
1016                                               PA_STREAM_NO_REMIX_CHANNELS|
1017                                               PA_STREAM_FIX_FORMAT|
1018                                               PA_STREAM_FIX_RATE|
1019                                               PA_STREAM_FIX_CHANNELS|
1020                                               PA_STREAM_DONT_MOVE|
1021                                               PA_STREAM_VARIABLE_RATE|
1022                                               PA_STREAM_PEAK_DETECT|
1023                                               PA_STREAM_START_MUTED|
1024                                               PA_STREAM_ADJUST_LATENCY|
1025                                               PA_STREAM_EARLY_REQUESTS|
1026                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1027                                               PA_STREAM_START_UNMUTED|
1028                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1029
1030     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1031     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1032     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1033     /* Althought some of the other flags are not supported on older
1034      * version, we don't check for them here, because it doesn't hurt
1035      * when they are passed but actually not supported. This makes
1036      * client development easier */
1037
1038     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1039     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1040     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1041     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1042     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1043
1044     pa_stream_ref(s);
1045
1046     s->direction = direction;
1047     s->flags = flags;
1048     s->corked = !!(flags & PA_STREAM_START_CORKED);
1049
1050     if (sync_stream)
1051         s->syncid = sync_stream->syncid;
1052
1053     if (attr)
1054         s->buffer_attr = *attr;
1055     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1056
1057     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1058         pa_usec_t x;
1059
1060         x = pa_rtclock_now();
1061
1062         pa_assert(!s->smoother);
1063         s->smoother = pa_smoother_new(
1064                 SMOOTHER_ADJUST_TIME,
1065                 SMOOTHER_HISTORY_TIME,
1066                 !(flags & PA_STREAM_NOT_MONOTONIC),
1067                 TRUE,
1068                 SMOOTHER_MIN_HISTORY,
1069                 x,
1070                 TRUE);
1071     }
1072
1073     if (!dev)
1074         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1075
1076     t = pa_tagstruct_command(
1077             s->context,
1078             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1079             &tag);
1080
1081     if (s->context->version < 13)
1082         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1083
1084     pa_tagstruct_put(
1085             t,
1086             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1087             PA_TAG_CHANNEL_MAP, &s->channel_map,
1088             PA_TAG_U32, PA_INVALID_INDEX,
1089             PA_TAG_STRING, dev,
1090             PA_TAG_U32, s->buffer_attr.maxlength,
1091             PA_TAG_BOOLEAN, s->corked,
1092             PA_TAG_INVALID);
1093
1094     if (s->direction == PA_STREAM_PLAYBACK) {
1095         pa_cvolume cv;
1096
1097         pa_tagstruct_put(
1098                 t,
1099                 PA_TAG_U32, s->buffer_attr.tlength,
1100                 PA_TAG_U32, s->buffer_attr.prebuf,
1101                 PA_TAG_U32, s->buffer_attr.minreq,
1102                 PA_TAG_U32, s->syncid,
1103                 PA_TAG_INVALID);
1104
1105         volume_set = !!volume;
1106
1107         if (!volume)
1108             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1109
1110         pa_tagstruct_put_cvolume(t, volume);
1111     } else
1112         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1113
1114     if (s->context->version >= 12) {
1115         pa_tagstruct_put(
1116                 t,
1117                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1118                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1119                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1120                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1121                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1122                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1123                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1124                 PA_TAG_INVALID);
1125     }
1126
1127     if (s->context->version >= 13) {
1128
1129         if (s->direction == PA_STREAM_PLAYBACK)
1130             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1131         else
1132             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1133
1134         pa_tagstruct_put(
1135                 t,
1136                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1137                 PA_TAG_PROPLIST, s->proplist,
1138                 PA_TAG_INVALID);
1139
1140         if (s->direction == PA_STREAM_RECORD)
1141             pa_tagstruct_putu32(t, s->direct_on_input);
1142     }
1143
1144     if (s->context->version >= 14) {
1145
1146         if (s->direction == PA_STREAM_PLAYBACK)
1147             pa_tagstruct_put_boolean(t, volume_set);
1148
1149         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1150     }
1151
1152     if (s->context->version >= 15) {
1153
1154         if (s->direction == PA_STREAM_PLAYBACK)
1155             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1156
1157         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1158         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1159     }
1160
1161     pa_pstream_send_tagstruct(s->context->pstream, t);
1162     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1163
1164     pa_stream_set_state(s, PA_STREAM_CREATING);
1165
1166     pa_stream_unref(s);
1167     return 0;
1168 }
1169
1170 int pa_stream_connect_playback(
1171         pa_stream *s,
1172         const char *dev,
1173         const pa_buffer_attr *attr,
1174         pa_stream_flags_t flags,
1175         pa_cvolume *volume,
1176         pa_stream *sync_stream) {
1177
1178     pa_assert(s);
1179     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1180
1181     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1182 }
1183
1184 int pa_stream_connect_record(
1185         pa_stream *s,
1186         const char *dev,
1187         const pa_buffer_attr *attr,
1188         pa_stream_flags_t flags) {
1189
1190     pa_assert(s);
1191     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1192
1193     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1194 }
1195
1196 int pa_stream_write(
1197         pa_stream *s,
1198         const void *data,
1199         size_t length,
1200         void (*free_cb)(void *p),
1201         int64_t offset,
1202         pa_seek_mode_t seek) {
1203
1204     pa_memchunk chunk;
1205     pa_seek_mode_t t_seek;
1206     int64_t t_offset;
1207     size_t t_length;
1208     const void *t_data;
1209
1210     pa_assert(s);
1211     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1212     pa_assert(data);
1213
1214     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1215     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1216     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1217     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1218     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1219
1220     if (length <= 0)
1221         return 0;
1222
1223     t_seek = seek;
1224     t_offset = offset;
1225     t_length = length;
1226     t_data = data;
1227
1228     while (t_length > 0) {
1229
1230         chunk.index = 0;
1231
1232         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1233             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1234             chunk.length = t_length;
1235         } else {
1236             void *d;
1237
1238             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1239             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1240
1241             d = pa_memblock_acquire(chunk.memblock);
1242             memcpy(d, t_data, chunk.length);
1243             pa_memblock_release(chunk.memblock);
1244         }
1245
1246         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1247
1248         t_offset = 0;
1249         t_seek = PA_SEEK_RELATIVE;
1250
1251         t_data = (const uint8_t*) t_data + chunk.length;
1252         t_length -= chunk.length;
1253
1254         pa_memblock_unref(chunk.memblock);
1255     }
1256
1257     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1258         free_cb((void*) data);
1259
1260     /* This is obviously wrong since we ignore the seeking index . But
1261      * that's OK, the server side applies the same error */
1262     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1263
1264     if (s->direction == PA_STREAM_PLAYBACK) {
1265
1266         /* Update latency request correction */
1267         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1268
1269             if (seek == PA_SEEK_ABSOLUTE) {
1270                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1271                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1272                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1273             } else if (seek == PA_SEEK_RELATIVE) {
1274                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1275                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1276             } else
1277                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1278         }
1279
1280         /* Update the write index in the already available latency data */
1281         if (s->timing_info_valid) {
1282
1283             if (seek == PA_SEEK_ABSOLUTE) {
1284                 s->timing_info.write_index_corrupt = FALSE;
1285                 s->timing_info.write_index = offset + (int64_t) length;
1286             } else if (seek == PA_SEEK_RELATIVE) {
1287                 if (!s->timing_info.write_index_corrupt)
1288                     s->timing_info.write_index += offset + (int64_t) length;
1289             } else
1290                 s->timing_info.write_index_corrupt = TRUE;
1291         }
1292
1293         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1294             request_auto_timing_update(s, TRUE);
1295     }
1296
1297     return 0;
1298 }
1299
1300 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1301     pa_assert(s);
1302     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1303     pa_assert(data);
1304     pa_assert(length);
1305
1306     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1307     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1308     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1309
1310     if (!s->peek_memchunk.memblock) {
1311
1312         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1313             *data = NULL;
1314             *length = 0;
1315             return 0;
1316         }
1317
1318         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1319     }
1320
1321     pa_assert(s->peek_data);
1322     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1323     *length = s->peek_memchunk.length;
1324     return 0;
1325 }
1326
1327 int pa_stream_drop(pa_stream *s) {
1328     pa_assert(s);
1329     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1330
1331     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1332     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1333     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1334     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1335
1336     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1337
1338     /* Fix the simulated local read index */
1339     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1340         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1341
1342     pa_assert(s->peek_data);
1343     pa_memblock_release(s->peek_memchunk.memblock);
1344     pa_memblock_unref(s->peek_memchunk.memblock);
1345     pa_memchunk_reset(&s->peek_memchunk);
1346
1347     return 0;
1348 }
1349
1350 size_t pa_stream_writable_size(pa_stream *s) {
1351     pa_assert(s);
1352     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1353
1354     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1355     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1356     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1357
1358     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1359 }
1360
1361 size_t pa_stream_readable_size(pa_stream *s) {
1362     pa_assert(s);
1363     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1364
1365     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1366     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1367     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1368
1369     return pa_memblockq_get_length(s->record_memblockq);
1370 }
1371
1372 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1373     pa_operation *o;
1374     pa_tagstruct *t;
1375     uint32_t tag;
1376
1377     pa_assert(s);
1378     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1379
1380     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1381     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1382     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1383
1384     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1385
1386     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1387     pa_tagstruct_putu32(t, s->channel);
1388     pa_pstream_send_tagstruct(s->context->pstream, t);
1389     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1390
1391     return o;
1392 }
1393
1394 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1395     pa_usec_t usec;
1396
1397     pa_assert(s);
1398     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1399     pa_assert(s->state == PA_STREAM_READY);
1400     pa_assert(s->direction != PA_STREAM_UPLOAD);
1401     pa_assert(s->timing_info_valid);
1402     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1403     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1404
1405     if (s->direction == PA_STREAM_PLAYBACK) {
1406         /* The last byte that was written into the output device
1407          * had this time value associated */
1408         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1409
1410         if (!s->corked && !s->suspended) {
1411
1412             if (!ignore_transport)
1413                 /* Because the latency info took a little time to come
1414                  * to us, we assume that the real output time is actually
1415                  * a little ahead */
1416                 usec += s->timing_info.transport_usec;
1417
1418             /* However, the output device usually maintains a buffer
1419                too, hence the real sample currently played is a little
1420                back  */
1421             if (s->timing_info.sink_usec >= usec)
1422                 usec = 0;
1423             else
1424                 usec -= s->timing_info.sink_usec;
1425         }
1426
1427     } else {
1428         pa_assert(s->direction == PA_STREAM_RECORD);
1429
1430         /* The last byte written into the server side queue had
1431          * this time value associated */
1432         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1433
1434         if (!s->corked && !s->suspended) {
1435
1436             if (!ignore_transport)
1437                 /* Add transport latency */
1438                 usec += s->timing_info.transport_usec;
1439
1440             /* Add latency of data in device buffer */
1441             usec += s->timing_info.source_usec;
1442
1443             /* If this is a monitor source, we need to correct the
1444              * time by the playback device buffer */
1445             if (s->timing_info.sink_usec >= usec)
1446                 usec = 0;
1447             else
1448                 usec -= s->timing_info.sink_usec;
1449         }
1450     }
1451
1452     return usec;
1453 }
1454
1455 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1456     pa_operation *o = userdata;
1457     struct timeval local, remote, now;
1458     pa_timing_info *i;
1459     pa_bool_t playing = FALSE;
1460     uint64_t underrun_for = 0, playing_for = 0;
1461
1462     pa_assert(pd);
1463     pa_assert(o);
1464     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1465
1466     if (!o->context || !o->stream)
1467         goto finish;
1468
1469     i = &o->stream->timing_info;
1470
1471     o->stream->timing_info_valid = FALSE;
1472     i->write_index_corrupt = TRUE;
1473     i->read_index_corrupt = TRUE;
1474
1475     if (command != PA_COMMAND_REPLY) {
1476         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1477             goto finish;
1478
1479     } else {
1480
1481         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1482             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1483             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1484             pa_tagstruct_get_timeval(t, &local) < 0 ||
1485             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1486             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1487             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1488
1489             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1490             goto finish;
1491         }
1492
1493         if (o->context->version >= 13 &&
1494             o->stream->direction == PA_STREAM_PLAYBACK)
1495             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1496                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1497
1498                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1499                 goto finish;
1500             }
1501
1502
1503         if (!pa_tagstruct_eof(t)) {
1504             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1505             goto finish;
1506         }
1507         o->stream->timing_info_valid = TRUE;
1508         i->write_index_corrupt = FALSE;
1509         i->read_index_corrupt = FALSE;
1510
1511         i->playing = (int) playing;
1512         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1513
1514         pa_gettimeofday(&now);
1515
1516         /* Calculcate timestamps */
1517         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1518             /* local and remote seem to have synchronized clocks */
1519
1520             if (o->stream->direction == PA_STREAM_PLAYBACK)
1521                 i->transport_usec = pa_timeval_diff(&remote, &local);
1522             else
1523                 i->transport_usec = pa_timeval_diff(&now, &remote);
1524
1525             i->synchronized_clocks = TRUE;
1526             i->timestamp = remote;
1527         } else {
1528             /* clocks are not synchronized, let's estimate latency then */
1529             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1530             i->synchronized_clocks = FALSE;
1531             i->timestamp = local;
1532             pa_timeval_add(&i->timestamp, i->transport_usec);
1533         }
1534
1535         /* Invalidate read and write indexes if necessary */
1536         if (tag < o->stream->read_index_not_before)
1537             i->read_index_corrupt = TRUE;
1538
1539         if (tag < o->stream->write_index_not_before)
1540             i->write_index_corrupt = TRUE;
1541
1542         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1543             /* Write index correction */
1544
1545             int n, j;
1546             uint32_t ctag = tag;
1547
1548             /* Go through the saved correction values and add up the
1549              * total correction.*/
1550             for (n = 0, j = o->stream->current_write_index_correction+1;
1551                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1552                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1553
1554                 /* Step over invalid data or out-of-date data */
1555                 if (!o->stream->write_index_corrections[j].valid ||
1556                     o->stream->write_index_corrections[j].tag < ctag)
1557                     continue;
1558
1559                 /* Make sure that everything is in order */
1560                 ctag = o->stream->write_index_corrections[j].tag+1;
1561
1562                 /* Now fix the write index */
1563                 if (o->stream->write_index_corrections[j].corrupt) {
1564                     /* A corrupting seek was made */
1565                     i->write_index_corrupt = TRUE;
1566                 } else if (o->stream->write_index_corrections[j].absolute) {
1567                     /* An absolute seek was made */
1568                     i->write_index = o->stream->write_index_corrections[j].value;
1569                     i->write_index_corrupt = FALSE;
1570                 } else if (!i->write_index_corrupt) {
1571                     /* A relative seek was made */
1572                     i->write_index += o->stream->write_index_corrections[j].value;
1573                 }
1574             }
1575
1576             /* Clear old correction entries */
1577             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1578                 if (!o->stream->write_index_corrections[n].valid)
1579                     continue;
1580
1581                 if (o->stream->write_index_corrections[n].tag <= tag)
1582                     o->stream->write_index_corrections[n].valid = FALSE;
1583             }
1584         }
1585
1586         if (o->stream->direction == PA_STREAM_RECORD) {
1587             /* Read index correction */
1588
1589             if (!i->read_index_corrupt)
1590                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1591         }
1592
1593         /* Update smoother */
1594         if (o->stream->smoother) {
1595             pa_usec_t u, x;
1596
1597             u = x = pa_rtclock_now() - i->transport_usec;
1598
1599             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1600                 pa_usec_t su;
1601
1602                 /* If we weren't playing then it will take some time
1603                  * until the audio will actually come out through the
1604                  * speakers. Since we follow that timing here, we need
1605                  * to try to fix this up */
1606
1607                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1608
1609                 if (su < i->sink_usec)
1610                     x += i->sink_usec - su;
1611             }
1612
1613             if (!i->playing)
1614                 pa_smoother_pause(o->stream->smoother, x);
1615
1616             /* Update the smoother */
1617             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1618                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1619                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1620
1621             if (i->playing)
1622                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1623         }
1624     }
1625
1626     o->stream->auto_timing_update_requested = FALSE;
1627
1628     if (o->stream->latency_update_callback)
1629         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1630
1631     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1632         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1633         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1634     }
1635
1636 finish:
1637
1638     pa_operation_done(o);
1639     pa_operation_unref(o);
1640 }
1641
1642 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1643     uint32_t tag;
1644     pa_operation *o;
1645     pa_tagstruct *t;
1646     struct timeval now;
1647     int cidx = 0;
1648
1649     pa_assert(s);
1650     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1651
1652     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1653     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1654     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1655
1656     if (s->direction == PA_STREAM_PLAYBACK) {
1657         /* Find a place to store the write_index correction data for this entry */
1658         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1659
1660         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1661         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1662     }
1663     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1664
1665     t = pa_tagstruct_command(
1666             s->context,
1667             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1668             &tag);
1669     pa_tagstruct_putu32(t, s->channel);
1670     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1671
1672     pa_pstream_send_tagstruct(s->context->pstream, t);
1673     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1674
1675     if (s->direction == PA_STREAM_PLAYBACK) {
1676         /* Fill in initial correction data */
1677
1678         s->current_write_index_correction = cidx;
1679
1680         s->write_index_corrections[cidx].valid = TRUE;
1681         s->write_index_corrections[cidx].absolute = FALSE;
1682         s->write_index_corrections[cidx].corrupt = FALSE;
1683         s->write_index_corrections[cidx].tag = tag;
1684         s->write_index_corrections[cidx].value = 0;
1685     }
1686
1687     return o;
1688 }
1689
1690 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1691     pa_stream *s = userdata;
1692
1693     pa_assert(pd);
1694     pa_assert(s);
1695     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1696
1697     pa_stream_ref(s);
1698
1699     if (command != PA_COMMAND_REPLY) {
1700         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1701             goto finish;
1702
1703         pa_stream_set_state(s, PA_STREAM_FAILED);
1704         goto finish;
1705     } else if (!pa_tagstruct_eof(t)) {
1706         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1707         goto finish;
1708     }
1709
1710     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1711
1712 finish:
1713     pa_stream_unref(s);
1714 }
1715
1716 int pa_stream_disconnect(pa_stream *s) {
1717     pa_tagstruct *t;
1718     uint32_t tag;
1719
1720     pa_assert(s);
1721     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1722
1723     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1724     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1725     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1726
1727     pa_stream_ref(s);
1728
1729     t = pa_tagstruct_command(
1730             s->context,
1731             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1732                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1733             &tag);
1734     pa_tagstruct_putu32(t, s->channel);
1735     pa_pstream_send_tagstruct(s->context->pstream, t);
1736     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1737
1738     pa_stream_unref(s);
1739     return 0;
1740 }
1741
1742 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1743     pa_assert(s);
1744     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1745
1746     if (pa_detect_fork())
1747         return;
1748
1749     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1750         return;
1751
1752     s->read_callback = cb;
1753     s->read_userdata = userdata;
1754 }
1755
1756 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1757     pa_assert(s);
1758     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1759
1760     if (pa_detect_fork())
1761         return;
1762
1763     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1764         return;
1765
1766     s->write_callback = cb;
1767     s->write_userdata = userdata;
1768 }
1769
1770 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1771     pa_assert(s);
1772     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1773
1774     if (pa_detect_fork())
1775         return;
1776
1777     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1778         return;
1779
1780     s->state_callback = cb;
1781     s->state_userdata = userdata;
1782 }
1783
1784 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1785     pa_assert(s);
1786     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1787
1788     if (pa_detect_fork())
1789         return;
1790
1791     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1792         return;
1793
1794     s->overflow_callback = cb;
1795     s->overflow_userdata = userdata;
1796 }
1797
1798 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1799     pa_assert(s);
1800     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1801
1802     if (pa_detect_fork())
1803         return;
1804
1805     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1806         return;
1807
1808     s->underflow_callback = cb;
1809     s->underflow_userdata = userdata;
1810 }
1811
1812 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1813     pa_assert(s);
1814     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1815
1816     if (pa_detect_fork())
1817         return;
1818
1819     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1820         return;
1821
1822     s->latency_update_callback = cb;
1823     s->latency_update_userdata = userdata;
1824 }
1825
1826 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1827     pa_assert(s);
1828     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1829
1830     if (pa_detect_fork())
1831         return;
1832
1833     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1834         return;
1835
1836     s->moved_callback = cb;
1837     s->moved_userdata = userdata;
1838 }
1839
1840 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1841     pa_assert(s);
1842     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1843
1844     if (pa_detect_fork())
1845         return;
1846
1847     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1848         return;
1849
1850     s->suspended_callback = cb;
1851     s->suspended_userdata = userdata;
1852 }
1853
1854 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1855     pa_assert(s);
1856     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1857
1858     if (pa_detect_fork())
1859         return;
1860
1861     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1862         return;
1863
1864     s->started_callback = cb;
1865     s->started_userdata = userdata;
1866 }
1867
1868 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1869     pa_assert(s);
1870     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1871
1872     if (pa_detect_fork())
1873         return;
1874
1875     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1876         return;
1877
1878     s->event_callback = cb;
1879     s->event_userdata = userdata;
1880 }
1881
1882 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1883     pa_assert(s);
1884     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1885
1886     if (pa_detect_fork())
1887         return;
1888
1889     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1890         return;
1891
1892     s->buffer_attr_callback = cb;
1893     s->buffer_attr_userdata = userdata;
1894 }
1895
1896 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1897     pa_operation *o = userdata;
1898     int success = 1;
1899
1900     pa_assert(pd);
1901     pa_assert(o);
1902     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1903
1904     if (!o->context)
1905         goto finish;
1906
1907     if (command != PA_COMMAND_REPLY) {
1908         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1909             goto finish;
1910
1911         success = 0;
1912     } else if (!pa_tagstruct_eof(t)) {
1913         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1914         goto finish;
1915     }
1916
1917     if (o->callback) {
1918         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1919         cb(o->stream, success, o->userdata);
1920     }
1921
1922 finish:
1923     pa_operation_done(o);
1924     pa_operation_unref(o);
1925 }
1926
1927 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1928     pa_operation *o;
1929     pa_tagstruct *t;
1930     uint32_t tag;
1931
1932     pa_assert(s);
1933     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1934
1935     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1936     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1937     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1938
1939     s->corked = b;
1940
1941     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1942
1943     t = pa_tagstruct_command(
1944             s->context,
1945             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1946             &tag);
1947     pa_tagstruct_putu32(t, s->channel);
1948     pa_tagstruct_put_boolean(t, !!b);
1949     pa_pstream_send_tagstruct(s->context->pstream, t);
1950     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1951
1952     check_smoother_status(s, FALSE, FALSE, FALSE);
1953
1954     /* This might cause the indexes to hang/start again, hence
1955      * let's request a timing update */
1956     request_auto_timing_update(s, TRUE);
1957
1958     return o;
1959 }
1960
1961 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1962     pa_tagstruct *t;
1963     pa_operation *o;
1964     uint32_t tag;
1965
1966     pa_assert(s);
1967     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1968
1969     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1970     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1971
1972     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1973
1974     t = pa_tagstruct_command(s->context, command, &tag);
1975     pa_tagstruct_putu32(t, s->channel);
1976     pa_pstream_send_tagstruct(s->context->pstream, t);
1977     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1978
1979     return o;
1980 }
1981
1982 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1983     pa_operation *o;
1984
1985     pa_assert(s);
1986     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1987
1988     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1989     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1990     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1991
1992     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1993         return NULL;
1994
1995     if (s->direction == PA_STREAM_PLAYBACK) {
1996
1997         if (s->write_index_corrections[s->current_write_index_correction].valid)
1998             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1999
2000         if (s->buffer_attr.prebuf > 0)
2001             check_smoother_status(s, FALSE, FALSE, TRUE);
2002
2003         /* This will change the write index, but leave the
2004          * read index untouched. */
2005         invalidate_indexes(s, FALSE, TRUE);
2006
2007     } else
2008         /* For record streams this has no influence on the write
2009          * index, but the read index might jump. */
2010         invalidate_indexes(s, TRUE, FALSE);
2011
2012     return o;
2013 }
2014
2015 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2016     pa_operation *o;
2017
2018     pa_assert(s);
2019     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2020
2021     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2022     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2023     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2024     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2025
2026     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2027         return NULL;
2028
2029     /* This might cause the read index to hang again, hence
2030      * let's request a timing update */
2031     request_auto_timing_update(s, TRUE);
2032
2033     return o;
2034 }
2035
2036 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2037     pa_operation *o;
2038
2039     pa_assert(s);
2040     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2041
2042     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2043     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2044     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2045     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2046
2047     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2048         return NULL;
2049
2050     /* This might cause the read index to start moving again, hence
2051      * let's request a timing update */
2052     request_auto_timing_update(s, TRUE);
2053
2054     return o;
2055 }
2056
2057 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2058     pa_operation *o;
2059
2060     pa_assert(s);
2061     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2062     pa_assert(name);
2063
2064     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2065     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2066     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2067
2068     if (s->context->version >= 13) {
2069         pa_proplist *p = pa_proplist_new();
2070
2071         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2072         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2073         pa_proplist_free(p);
2074     } else {
2075         pa_tagstruct *t;
2076         uint32_t tag;
2077
2078         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2079         t = pa_tagstruct_command(
2080                 s->context,
2081                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2082                 &tag);
2083         pa_tagstruct_putu32(t, s->channel);
2084         pa_tagstruct_puts(t, name);
2085         pa_pstream_send_tagstruct(s->context->pstream, t);
2086         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2087     }
2088
2089     return o;
2090 }
2091
2092 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2093     pa_usec_t usec;
2094
2095     pa_assert(s);
2096     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2097
2098     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2099     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2100     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2101     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2102     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2103     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2104
2105     if (s->smoother)
2106         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2107     else
2108         usec = calc_time(s, FALSE);
2109
2110     /* Make sure the time runs monotonically */
2111     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2112         if (usec < s->previous_time)
2113             usec = s->previous_time;
2114         else
2115             s->previous_time = usec;
2116     }
2117
2118     if (r_usec)
2119         *r_usec = usec;
2120
2121     return 0;
2122 }
2123
2124 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2125     pa_assert(s);
2126     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2127
2128     if (negative)
2129         *negative = 0;
2130
2131     if (a >= b)
2132         return a-b;
2133     else {
2134         if (negative && s->direction == PA_STREAM_RECORD) {
2135             *negative = 1;
2136             return b-a;
2137         } else
2138             return 0;
2139     }
2140 }
2141
2142 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2143     pa_usec_t t, c;
2144     int r;
2145     int64_t cindex;
2146
2147     pa_assert(s);
2148     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2149     pa_assert(r_usec);
2150
2151     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2152     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2153     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2154     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2155     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2156     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2157
2158     if ((r = pa_stream_get_time(s, &t)) < 0)
2159         return r;
2160
2161     if (s->direction == PA_STREAM_PLAYBACK)
2162         cindex = s->timing_info.write_index;
2163     else
2164         cindex = s->timing_info.read_index;
2165
2166     if (cindex < 0)
2167         cindex = 0;
2168
2169     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2170
2171     if (s->direction == PA_STREAM_PLAYBACK)
2172         *r_usec = time_counter_diff(s, c, t, negative);
2173     else
2174         *r_usec = time_counter_diff(s, t, c, negative);
2175
2176     return 0;
2177 }
2178
2179 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2180     pa_assert(s);
2181     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2182
2183     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2184     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2185     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2186     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2187
2188     return &s->timing_info;
2189 }
2190
2191 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2192     pa_assert(s);
2193     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2194
2195     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2196
2197     return &s->sample_spec;
2198 }
2199
2200 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2201     pa_assert(s);
2202     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2203
2204     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2205
2206     return &s->channel_map;
2207 }
2208
2209 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2210     pa_assert(s);
2211     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2212
2213     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2214     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2215     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2216
2217     return &s->buffer_attr;
2218 }
2219
2220 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2221     pa_operation *o = userdata;
2222     int success = 1;
2223
2224     pa_assert(pd);
2225     pa_assert(o);
2226     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2227
2228     if (!o->context)
2229         goto finish;
2230
2231     if (command != PA_COMMAND_REPLY) {
2232         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2233             goto finish;
2234
2235         success = 0;
2236     } else {
2237         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2238             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2239                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2240                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2241                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2242                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2243                 goto finish;
2244             }
2245         } else if (o->stream->direction == PA_STREAM_RECORD) {
2246             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2247                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2248                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2249                 goto finish;
2250             }
2251         }
2252
2253         if (o->stream->context->version >= 13) {
2254             pa_usec_t usec;
2255
2256             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2257                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2258                 goto finish;
2259             }
2260
2261             if (o->stream->direction == PA_STREAM_RECORD)
2262                 o->stream->timing_info.configured_source_usec = usec;
2263             else
2264                 o->stream->timing_info.configured_sink_usec = usec;
2265         }
2266
2267         if (!pa_tagstruct_eof(t)) {
2268             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2269             goto finish;
2270         }
2271     }
2272
2273     if (o->callback) {
2274         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2275         cb(o->stream, success, o->userdata);
2276     }
2277
2278 finish:
2279     pa_operation_done(o);
2280     pa_operation_unref(o);
2281 }
2282
2283
2284 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2285     pa_operation *o;
2286     pa_tagstruct *t;
2287     uint32_t tag;
2288
2289     pa_assert(s);
2290     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2291     pa_assert(attr);
2292
2293     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2294     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2295     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2296     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2297
2298     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2299
2300     t = pa_tagstruct_command(
2301             s->context,
2302             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2303             &tag);
2304     pa_tagstruct_putu32(t, s->channel);
2305
2306     pa_tagstruct_putu32(t, attr->maxlength);
2307
2308     if (s->direction == PA_STREAM_PLAYBACK)
2309         pa_tagstruct_put(
2310                 t,
2311                 PA_TAG_U32, attr->tlength,
2312                 PA_TAG_U32, attr->prebuf,
2313                 PA_TAG_U32, attr->minreq,
2314                 PA_TAG_INVALID);
2315     else
2316         pa_tagstruct_putu32(t, attr->fragsize);
2317
2318     if (s->context->version >= 13)
2319         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2320
2321     if (s->context->version >= 14)
2322         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2323
2324     pa_pstream_send_tagstruct(s->context->pstream, t);
2325     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2326
2327     /* This might cause changes in the read/write indexex, hence let's
2328      * request a timing update */
2329     request_auto_timing_update(s, TRUE);
2330
2331     return o;
2332 }
2333
2334 uint32_t pa_stream_get_device_index(pa_stream *s) {
2335     pa_assert(s);
2336     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2337
2338     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2339     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2340     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2341     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2342     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2343
2344     return s->device_index;
2345 }
2346
2347 const char *pa_stream_get_device_name(pa_stream *s) {
2348     pa_assert(s);
2349     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2350
2351     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2352     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2353     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2354     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2355     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2356
2357     return s->device_name;
2358 }
2359
2360 int pa_stream_is_suspended(pa_stream *s) {
2361     pa_assert(s);
2362     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2363
2364     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2365     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2366     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2367     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2368
2369     return s->suspended;
2370 }
2371
2372 int pa_stream_is_corked(pa_stream *s) {
2373     pa_assert(s);
2374     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2375
2376     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2377     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2378     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2379
2380     return s->corked;
2381 }
2382
2383 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2384     pa_operation *o = userdata;
2385     int success = 1;
2386
2387     pa_assert(pd);
2388     pa_assert(o);
2389     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2390
2391     if (!o->context)
2392         goto finish;
2393
2394     if (command != PA_COMMAND_REPLY) {
2395         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2396             goto finish;
2397
2398         success = 0;
2399     } else {
2400
2401         if (!pa_tagstruct_eof(t)) {
2402             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2403             goto finish;
2404         }
2405     }
2406
2407     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2408     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2409
2410     if (o->callback) {
2411         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2412         cb(o->stream, success, o->userdata);
2413     }
2414
2415 finish:
2416     pa_operation_done(o);
2417     pa_operation_unref(o);
2418 }
2419
2420
2421 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2422     pa_operation *o;
2423     pa_tagstruct *t;
2424     uint32_t tag;
2425
2426     pa_assert(s);
2427     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2428
2429     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2430     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2431     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2432     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2433     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2434     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2435
2436     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2437     o->private = PA_UINT_TO_PTR(rate);
2438
2439     t = pa_tagstruct_command(
2440             s->context,
2441             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2442             &tag);
2443     pa_tagstruct_putu32(t, s->channel);
2444     pa_tagstruct_putu32(t, rate);
2445
2446     pa_pstream_send_tagstruct(s->context->pstream, t);
2447     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2448
2449     return o;
2450 }
2451
2452 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2453     pa_operation *o;
2454     pa_tagstruct *t;
2455     uint32_t tag;
2456
2457     pa_assert(s);
2458     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2459
2460     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2461     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2462     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2463     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2464     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2465
2466     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2467
2468     t = pa_tagstruct_command(
2469             s->context,
2470             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2471             &tag);
2472     pa_tagstruct_putu32(t, s->channel);
2473     pa_tagstruct_putu32(t, (uint32_t) mode);
2474     pa_tagstruct_put_proplist(t, p);
2475
2476     pa_pstream_send_tagstruct(s->context->pstream, t);
2477     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2478
2479     /* Please note that we don't update s->proplist here, because we
2480      * don't export that field */
2481
2482     return o;
2483 }
2484
2485 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2486     pa_operation *o;
2487     pa_tagstruct *t;
2488     uint32_t tag;
2489     const char * const*k;
2490
2491     pa_assert(s);
2492     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2493
2494     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2495     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2496     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2497     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2498     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2499
2500     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2501
2502     t = pa_tagstruct_command(
2503             s->context,
2504             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2505             &tag);
2506     pa_tagstruct_putu32(t, s->channel);
2507
2508     for (k = keys; *k; k++)
2509         pa_tagstruct_puts(t, *k);
2510
2511     pa_tagstruct_puts(t, NULL);
2512
2513     pa_pstream_send_tagstruct(s->context->pstream, t);
2514     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2515
2516     /* Please note that we don't update s->proplist here, because we
2517      * don't export that field */
2518
2519     return o;
2520 }
2521
2522 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2523     pa_assert(s);
2524     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2525
2526     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2527     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2528     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2529     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2530
2531     s->direct_on_input = sink_input_idx;
2532
2533     return 0;
2534 }
2535
2536 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2537     pa_assert(s);
2538     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2539
2540     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2541     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2542     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2543
2544     return s->direct_on_input;
2545 }