pulse: move pa_rtclock_now in pulsecommon
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57     s->read_callback = NULL;
58     s->read_userdata = NULL;
59     s->write_callback = NULL;
60     s->write_userdata = NULL;
61     s->state_callback = NULL;
62     s->state_userdata = NULL;
63     s->overflow_callback = NULL;
64     s->overflow_userdata = NULL;
65     s->underflow_callback = NULL;
66     s->underflow_userdata = NULL;
67     s->latency_update_callback = NULL;
68     s->latency_update_userdata = NULL;
69     s->moved_callback = NULL;
70     s->moved_userdata = NULL;
71     s->suspended_callback = NULL;
72     s->suspended_userdata = NULL;
73     s->started_callback = NULL;
74     s->started_userdata = NULL;
75     s->event_callback = NULL;
76     s->event_userdata = NULL;
77     s->buffer_attr_callback = NULL;
78     s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82         pa_context *c,
83         const char *name,
84         const pa_sample_spec *ss,
85         const pa_channel_map *map,
86         pa_proplist *p) {
87
88     pa_stream *s;
89     int i;
90     pa_channel_map tmap;
91
92     pa_assert(c);
93     pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     if (!map)
104         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114
115     s->sample_spec = *ss;
116     s->channel_map = *map;
117
118     s->direct_on_input = PA_INVALID_INDEX;
119
120     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121     if (name)
122         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124     s->channel = 0;
125     s->channel_valid = FALSE;
126     s->syncid = c->csyncid++;
127     s->stream_index = PA_INVALID_INDEX;
128
129     s->requested_bytes = 0;
130     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132     /* We initialize der target length here, so that if the user
133      * passes no explicit buffering metrics the default is similar to
134      * what older PA versions provided. */
135
136     s->buffer_attr.maxlength = (uint32_t) -1;
137     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138     s->buffer_attr.minreq = (uint32_t) -1;
139     s->buffer_attr.prebuf = (uint32_t) -1;
140     s->buffer_attr.fragsize = (uint32_t) -1;
141
142     s->device_index = PA_INVALID_INDEX;
143     s->device_name = NULL;
144     s->suspended = FALSE;
145     s->corked = FALSE;
146
147     pa_memchunk_reset(&s->peek_memchunk);
148     s->peek_data = NULL;
149
150     s->record_memblockq = NULL;
151
152
153     memset(&s->timing_info, 0, sizeof(s->timing_info));
154     s->timing_info_valid = FALSE;
155
156     s->previous_time = 0;
157
158     s->read_index_not_before = 0;
159     s->write_index_not_before = 0;
160     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
161         s->write_index_corrections[i].valid = 0;
162     s->current_write_index_correction = 0;
163
164     s->auto_timing_update_event = NULL;
165     s->auto_timing_update_requested = FALSE;
166     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
167
168     reset_callbacks(s);
169
170     s->smoother = NULL;
171
172     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
173     PA_LLIST_PREPEND(pa_stream, c->streams, s);
174     pa_stream_ref(s);
175
176     return s;
177 }
178
179 static void stream_unlink(pa_stream *s) {
180     pa_operation *o, *n;
181     pa_assert(s);
182
183     if (!s->context)
184         return;
185
186     /* Detach from context */
187
188     /* Unref all operatio object that point to us */
189     for (o = s->context->operations; o; o = n) {
190         n = o->next;
191
192         if (o->stream == s)
193             pa_operation_cancel(o);
194     }
195
196     /* Drop all outstanding replies for this stream */
197     if (s->context->pdispatch)
198         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
199
200     if (s->channel_valid) {
201         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
202         s->channel = 0;
203         s->channel_valid = FALSE;
204     }
205
206     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
207     pa_stream_unref(s);
208
209     s->context = NULL;
210
211     if (s->auto_timing_update_event) {
212         pa_assert(s->mainloop);
213         s->mainloop->time_free(s->auto_timing_update_event);
214     }
215
216     reset_callbacks(s);
217 }
218
219 static void stream_free(pa_stream *s) {
220     pa_assert(s);
221
222     stream_unlink(s);
223
224     if (s->peek_memchunk.memblock) {
225         if (s->peek_data)
226             pa_memblock_release(s->peek_memchunk.memblock);
227         pa_memblock_unref(s->peek_memchunk.memblock);
228     }
229
230     if (s->record_memblockq)
231         pa_memblockq_free(s->record_memblockq);
232
233     if (s->proplist)
234         pa_proplist_free(s->proplist);
235
236     if (s->smoother)
237         pa_smoother_free(s->smoother);
238
239     pa_xfree(s->device_name);
240     pa_xfree(s);
241 }
242
243 void pa_stream_unref(pa_stream *s) {
244     pa_assert(s);
245     pa_assert(PA_REFCNT_VALUE(s) >= 1);
246
247     if (PA_REFCNT_DEC(s) <= 0)
248         stream_free(s);
249 }
250
251 pa_stream* pa_stream_ref(pa_stream *s) {
252     pa_assert(s);
253     pa_assert(PA_REFCNT_VALUE(s) >= 1);
254
255     PA_REFCNT_INC(s);
256     return s;
257 }
258
259 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
260     pa_assert(s);
261     pa_assert(PA_REFCNT_VALUE(s) >= 1);
262
263     return s->state;
264 }
265
266 pa_context* pa_stream_get_context(pa_stream *s) {
267     pa_assert(s);
268     pa_assert(PA_REFCNT_VALUE(s) >= 1);
269
270     return s->context;
271 }
272
273 uint32_t pa_stream_get_index(pa_stream *s) {
274     pa_assert(s);
275     pa_assert(PA_REFCNT_VALUE(s) >= 1);
276
277     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
278     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
279
280     return s->stream_index;
281 }
282
283 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
284     pa_assert(s);
285     pa_assert(PA_REFCNT_VALUE(s) >= 1);
286
287     if (s->state == st)
288         return;
289
290     pa_stream_ref(s);
291
292     s->state = st;
293
294     if (s->state_callback)
295         s->state_callback(s, s->state_userdata);
296
297     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
298         stream_unlink(s);
299
300     pa_stream_unref(s);
301 }
302
303 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
304     pa_assert(s);
305     pa_assert(PA_REFCNT_VALUE(s) >= 1);
306
307     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
308         return;
309
310     if (s->state == PA_STREAM_READY &&
311         (force || !s->auto_timing_update_requested)) {
312         pa_operation *o;
313
314 /*         pa_log("Automatically requesting new timing data"); */
315
316         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
317             pa_operation_unref(o);
318             s->auto_timing_update_requested = TRUE;
319         }
320     }
321
322     if (s->auto_timing_update_event) {
323         struct timeval next;
324
325         if (force)
326             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
327
328         pa_gettimeofday(&next);
329         pa_timeval_add(&next, s->auto_timing_interval_usec);
330         s->mainloop->time_restart(s->auto_timing_update_event, &next);
331
332         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
333     }
334 }
335
336 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
337     pa_context *c = userdata;
338     pa_stream *s;
339     uint32_t channel;
340
341     pa_assert(pd);
342     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
343     pa_assert(t);
344     pa_assert(c);
345     pa_assert(PA_REFCNT_VALUE(c) >= 1);
346
347     pa_context_ref(c);
348
349     if (pa_tagstruct_getu32(t, &channel) < 0 ||
350         !pa_tagstruct_eof(t)) {
351         pa_context_fail(c, PA_ERR_PROTOCOL);
352         goto finish;
353     }
354
355     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
356         goto finish;
357
358     if (s->state != PA_STREAM_READY)
359         goto finish;
360
361     pa_context_set_error(c, PA_ERR_KILLED);
362     pa_stream_set_state(s, PA_STREAM_FAILED);
363
364 finish:
365     pa_context_unref(c);
366 }
367
368 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
369     pa_usec_t x;
370
371     pa_assert(s);
372     pa_assert(!force_start || !force_stop);
373
374     if (!s->smoother)
375         return;
376
377     x = pa_rtclock_now();
378
379     if (s->timing_info_valid) {
380         if (aposteriori)
381             x -= s->timing_info.transport_usec;
382         else
383             x += s->timing_info.transport_usec;
384     }
385
386     if (s->suspended || s->corked || force_stop)
387         pa_smoother_pause(s->smoother, x);
388     else if (force_start || s->buffer_attr.prebuf == 0)
389         pa_smoother_resume(s->smoother, x, TRUE);
390
391
392     /* Please note that we have no idea if playback actually started
393      * if prebuf is non-zero! */
394 }
395
396 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
397     pa_context *c = userdata;
398     pa_stream *s;
399     uint32_t channel;
400     const char *dn;
401     pa_bool_t suspended;
402     uint32_t di;
403     pa_usec_t usec = 0;
404     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
405
406     pa_assert(pd);
407     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
408     pa_assert(t);
409     pa_assert(c);
410     pa_assert(PA_REFCNT_VALUE(c) >= 1);
411
412     pa_context_ref(c);
413
414     if (c->version < 12) {
415         pa_context_fail(c, PA_ERR_PROTOCOL);
416         goto finish;
417     }
418
419     if (pa_tagstruct_getu32(t, &channel) < 0 ||
420         pa_tagstruct_getu32(t, &di) < 0 ||
421         pa_tagstruct_gets(t, &dn) < 0 ||
422         pa_tagstruct_get_boolean(t, &suspended) < 0) {
423         pa_context_fail(c, PA_ERR_PROTOCOL);
424         goto finish;
425     }
426
427     if (c->version >= 13) {
428
429         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
430             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
431                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
432                 pa_tagstruct_get_usec(t, &usec) < 0) {
433                 pa_context_fail(c, PA_ERR_PROTOCOL);
434                 goto finish;
435             }
436         } else {
437             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
438                 pa_tagstruct_getu32(t, &tlength) < 0 ||
439                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
440                 pa_tagstruct_getu32(t, &minreq) < 0 ||
441                 pa_tagstruct_get_usec(t, &usec) < 0) {
442                 pa_context_fail(c, PA_ERR_PROTOCOL);
443                 goto finish;
444             }
445         }
446     }
447
448     if (!pa_tagstruct_eof(t)) {
449         pa_context_fail(c, PA_ERR_PROTOCOL);
450         goto finish;
451     }
452
453     if (!dn || di == PA_INVALID_INDEX) {
454         pa_context_fail(c, PA_ERR_PROTOCOL);
455         goto finish;
456     }
457
458     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
459         goto finish;
460
461     if (s->state != PA_STREAM_READY)
462         goto finish;
463
464     if (c->version >= 13) {
465         if (s->direction == PA_STREAM_RECORD)
466             s->timing_info.configured_source_usec = usec;
467         else
468             s->timing_info.configured_sink_usec = usec;
469
470         s->buffer_attr.maxlength = maxlength;
471         s->buffer_attr.fragsize = fragsize;
472         s->buffer_attr.tlength = tlength;
473         s->buffer_attr.prebuf = prebuf;
474         s->buffer_attr.minreq = minreq;
475     }
476
477     pa_xfree(s->device_name);
478     s->device_name = pa_xstrdup(dn);
479     s->device_index = di;
480
481     s->suspended = suspended;
482
483     check_smoother_status(s, TRUE, FALSE, FALSE);
484     request_auto_timing_update(s, TRUE);
485
486     if (s->moved_callback)
487         s->moved_callback(s, s->moved_userdata);
488
489 finish:
490     pa_context_unref(c);
491 }
492
493 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
494     pa_context *c = userdata;
495     pa_stream *s;
496     uint32_t channel;
497     pa_usec_t usec = 0;
498     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
499
500     pa_assert(pd);
501     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
502     pa_assert(t);
503     pa_assert(c);
504     pa_assert(PA_REFCNT_VALUE(c) >= 1);
505
506     pa_context_ref(c);
507
508     if (c->version < 15) {
509         pa_context_fail(c, PA_ERR_PROTOCOL);
510         goto finish;
511     }
512
513     if (pa_tagstruct_getu32(t, &channel) < 0) {
514         pa_context_fail(c, PA_ERR_PROTOCOL);
515         goto finish;
516     }
517
518     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
519         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
520             pa_tagstruct_getu32(t, &fragsize) < 0 ||
521             pa_tagstruct_get_usec(t, &usec) < 0) {
522             pa_context_fail(c, PA_ERR_PROTOCOL);
523             goto finish;
524         }
525     } else {
526         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
527             pa_tagstruct_getu32(t, &tlength) < 0 ||
528             pa_tagstruct_getu32(t, &prebuf) < 0 ||
529             pa_tagstruct_getu32(t, &minreq) < 0 ||
530             pa_tagstruct_get_usec(t, &usec) < 0) {
531             pa_context_fail(c, PA_ERR_PROTOCOL);
532             goto finish;
533         }
534     }
535
536     if (!pa_tagstruct_eof(t)) {
537         pa_context_fail(c, PA_ERR_PROTOCOL);
538         goto finish;
539     }
540
541     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
542         goto finish;
543
544     if (s->state != PA_STREAM_READY)
545         goto finish;
546
547     if (s->direction == PA_STREAM_RECORD)
548         s->timing_info.configured_source_usec = usec;
549     else
550         s->timing_info.configured_sink_usec = usec;
551
552     s->buffer_attr.maxlength = maxlength;
553     s->buffer_attr.fragsize = fragsize;
554     s->buffer_attr.tlength = tlength;
555     s->buffer_attr.prebuf = prebuf;
556     s->buffer_attr.minreq = minreq;
557
558     request_auto_timing_update(s, TRUE);
559
560     if (s->buffer_attr_callback)
561         s->buffer_attr_callback(s, s->buffer_attr_userdata);
562
563 finish:
564     pa_context_unref(c);
565 }
566
567 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
568     pa_context *c = userdata;
569     pa_stream *s;
570     uint32_t channel;
571     pa_bool_t suspended;
572
573     pa_assert(pd);
574     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
575     pa_assert(t);
576     pa_assert(c);
577     pa_assert(PA_REFCNT_VALUE(c) >= 1);
578
579     pa_context_ref(c);
580
581     if (c->version < 12) {
582         pa_context_fail(c, PA_ERR_PROTOCOL);
583         goto finish;
584     }
585
586     if (pa_tagstruct_getu32(t, &channel) < 0 ||
587         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
588         !pa_tagstruct_eof(t)) {
589         pa_context_fail(c, PA_ERR_PROTOCOL);
590         goto finish;
591     }
592
593     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
594         goto finish;
595
596     if (s->state != PA_STREAM_READY)
597         goto finish;
598
599     s->suspended = suspended;
600
601     check_smoother_status(s, TRUE, FALSE, FALSE);
602     request_auto_timing_update(s, TRUE);
603
604     if (s->suspended_callback)
605         s->suspended_callback(s, s->suspended_userdata);
606
607 finish:
608     pa_context_unref(c);
609 }
610
611 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
612     pa_context *c = userdata;
613     pa_stream *s;
614     uint32_t channel;
615
616     pa_assert(pd);
617     pa_assert(command == PA_COMMAND_STARTED);
618     pa_assert(t);
619     pa_assert(c);
620     pa_assert(PA_REFCNT_VALUE(c) >= 1);
621
622     pa_context_ref(c);
623
624     if (c->version < 13) {
625         pa_context_fail(c, PA_ERR_PROTOCOL);
626         goto finish;
627     }
628
629     if (pa_tagstruct_getu32(t, &channel) < 0 ||
630         !pa_tagstruct_eof(t)) {
631         pa_context_fail(c, PA_ERR_PROTOCOL);
632         goto finish;
633     }
634
635     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
636         goto finish;
637
638     if (s->state != PA_STREAM_READY)
639         goto finish;
640
641     check_smoother_status(s, TRUE, TRUE, FALSE);
642     request_auto_timing_update(s, TRUE);
643
644     if (s->started_callback)
645         s->started_callback(s, s->started_userdata);
646
647 finish:
648     pa_context_unref(c);
649 }
650
651 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
652     pa_context *c = userdata;
653     pa_stream *s;
654     uint32_t channel;
655     pa_proplist *pl = NULL;
656     const char *event;
657
658     pa_assert(pd);
659     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
660     pa_assert(t);
661     pa_assert(c);
662     pa_assert(PA_REFCNT_VALUE(c) >= 1);
663
664     pa_context_ref(c);
665
666     if (c->version < 15) {
667         pa_context_fail(c, PA_ERR_PROTOCOL);
668         goto finish;
669     }
670
671     pl = pa_proplist_new();
672
673     if (pa_tagstruct_getu32(t, &channel) < 0 ||
674         pa_tagstruct_gets(t, &event) < 0 ||
675         pa_tagstruct_get_proplist(t, pl) < 0 ||
676         !pa_tagstruct_eof(t) || !event) {
677         pa_context_fail(c, PA_ERR_PROTOCOL);
678         goto finish;
679     }
680
681     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
682         goto finish;
683
684     if (s->state != PA_STREAM_READY)
685         goto finish;
686
687     if (s->event_callback)
688         s->event_callback(s, event, pl, s->event_userdata);
689
690 finish:
691     pa_context_unref(c);
692
693     if (pl)
694         pa_proplist_free(pl);
695 }
696
697 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
698     pa_stream *s;
699     pa_context *c = userdata;
700     uint32_t bytes, channel;
701
702     pa_assert(pd);
703     pa_assert(command == PA_COMMAND_REQUEST);
704     pa_assert(t);
705     pa_assert(c);
706     pa_assert(PA_REFCNT_VALUE(c) >= 1);
707
708     pa_context_ref(c);
709
710     if (pa_tagstruct_getu32(t, &channel) < 0 ||
711         pa_tagstruct_getu32(t, &bytes) < 0 ||
712         !pa_tagstruct_eof(t)) {
713         pa_context_fail(c, PA_ERR_PROTOCOL);
714         goto finish;
715     }
716
717     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
718         goto finish;
719
720     if (s->state != PA_STREAM_READY)
721         goto finish;
722
723     s->requested_bytes += bytes;
724
725     if (s->requested_bytes > 0 && s->write_callback)
726         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
727
728 finish:
729     pa_context_unref(c);
730 }
731
732 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
733     pa_stream *s;
734     pa_context *c = userdata;
735     uint32_t channel;
736
737     pa_assert(pd);
738     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
739     pa_assert(t);
740     pa_assert(c);
741     pa_assert(PA_REFCNT_VALUE(c) >= 1);
742
743     pa_context_ref(c);
744
745     if (pa_tagstruct_getu32(t, &channel) < 0 ||
746         !pa_tagstruct_eof(t)) {
747         pa_context_fail(c, PA_ERR_PROTOCOL);
748         goto finish;
749     }
750
751     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
752         goto finish;
753
754     if (s->state != PA_STREAM_READY)
755         goto finish;
756
757     if (s->buffer_attr.prebuf > 0)
758         check_smoother_status(s, TRUE, FALSE, TRUE);
759
760     request_auto_timing_update(s, TRUE);
761
762     if (command == PA_COMMAND_OVERFLOW) {
763         if (s->overflow_callback)
764             s->overflow_callback(s, s->overflow_userdata);
765     } else if (command == PA_COMMAND_UNDERFLOW) {
766         if (s->underflow_callback)
767             s->underflow_callback(s, s->underflow_userdata);
768     }
769
770  finish:
771     pa_context_unref(c);
772 }
773
774 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
775     pa_assert(s);
776     pa_assert(PA_REFCNT_VALUE(s) >= 1);
777
778 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
779
780     if (s->state != PA_STREAM_READY)
781         return;
782
783     if (w) {
784         s->write_index_not_before = s->context->ctag;
785
786         if (s->timing_info_valid)
787             s->timing_info.write_index_corrupt = TRUE;
788
789 /*         pa_log("write_index invalidated"); */
790     }
791
792     if (r) {
793         s->read_index_not_before = s->context->ctag;
794
795         if (s->timing_info_valid)
796             s->timing_info.read_index_corrupt = TRUE;
797
798 /*         pa_log("read_index invalidated"); */
799     }
800
801     request_auto_timing_update(s, TRUE);
802 }
803
804 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
805     pa_stream *s = userdata;
806
807     pa_assert(s);
808     pa_assert(PA_REFCNT_VALUE(s) >= 1);
809
810     pa_stream_ref(s);
811     request_auto_timing_update(s, FALSE);
812     pa_stream_unref(s);
813 }
814
815 static void create_stream_complete(pa_stream *s) {
816     pa_assert(s);
817     pa_assert(PA_REFCNT_VALUE(s) >= 1);
818     pa_assert(s->state == PA_STREAM_CREATING);
819
820     pa_stream_set_state(s, PA_STREAM_READY);
821
822     if (s->requested_bytes > 0 && s->write_callback)
823         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
824
825     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
826         struct timeval tv;
827         pa_gettimeofday(&tv);
828         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
829         pa_timeval_add(&tv, s->auto_timing_interval_usec);
830         pa_assert(!s->auto_timing_update_event);
831         s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
832
833         request_auto_timing_update(s, TRUE);
834     }
835
836     check_smoother_status(s, TRUE, FALSE, FALSE);
837 }
838
839 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
840     pa_assert(s);
841     pa_assert(attr);
842     pa_assert(ss);
843
844     if (s->context->version >= 13)
845         return;
846
847     /* Version older than 0.9.10 didn't do server side buffer_attr
848      * selection, hence we have to fake it on the client side. */
849
850     /* We choose fairly conservative values here, to not confuse
851      * old clients with extremely large playback buffers */
852
853     if (attr->maxlength == (uint32_t) -1)
854         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
855
856     if (attr->tlength == (uint32_t) -1)
857         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
858
859     if (attr->minreq == (uint32_t) -1)
860         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
861
862     if (attr->prebuf == (uint32_t) -1)
863         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
864
865     if (attr->fragsize == (uint32_t) -1)
866         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
867 }
868
869 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
870     pa_stream *s = userdata;
871     uint32_t requested_bytes;
872
873     pa_assert(pd);
874     pa_assert(s);
875     pa_assert(PA_REFCNT_VALUE(s) >= 1);
876     pa_assert(s->state == PA_STREAM_CREATING);
877
878     pa_stream_ref(s);
879
880     if (command != PA_COMMAND_REPLY) {
881         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
882             goto finish;
883
884         pa_stream_set_state(s, PA_STREAM_FAILED);
885         goto finish;
886     }
887
888     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
889         s->channel == PA_INVALID_INDEX ||
890         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
891         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
892         pa_context_fail(s->context, PA_ERR_PROTOCOL);
893         goto finish;
894     }
895
896     s->requested_bytes = (int64_t) requested_bytes;
897
898     if (s->context->version >= 9) {
899         if (s->direction == PA_STREAM_PLAYBACK) {
900             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
901                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
902                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
903                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
904                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
905                 goto finish;
906             }
907         } else if (s->direction == PA_STREAM_RECORD) {
908             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
909                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
910                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
911                 goto finish;
912             }
913         }
914     }
915
916     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
917         pa_sample_spec ss;
918         pa_channel_map cm;
919         const char *dn = NULL;
920         pa_bool_t suspended;
921
922         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
923             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
924             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
925             pa_tagstruct_gets(t, &dn) < 0 ||
926             pa_tagstruct_get_boolean(t, &suspended) < 0) {
927             pa_context_fail(s->context, PA_ERR_PROTOCOL);
928             goto finish;
929         }
930
931         if (!dn || s->device_index == PA_INVALID_INDEX ||
932             ss.channels != cm.channels ||
933             !pa_channel_map_valid(&cm) ||
934             !pa_sample_spec_valid(&ss) ||
935             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
936             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
937             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
938             pa_context_fail(s->context, PA_ERR_PROTOCOL);
939             goto finish;
940         }
941
942         pa_xfree(s->device_name);
943         s->device_name = pa_xstrdup(dn);
944         s->suspended = suspended;
945
946         s->channel_map = cm;
947         s->sample_spec = ss;
948     }
949
950     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
951         pa_usec_t usec;
952
953         if (pa_tagstruct_get_usec(t, &usec) < 0) {
954             pa_context_fail(s->context, PA_ERR_PROTOCOL);
955             goto finish;
956         }
957
958         if (s->direction == PA_STREAM_RECORD)
959             s->timing_info.configured_source_usec = usec;
960         else
961             s->timing_info.configured_sink_usec = usec;
962     }
963
964     if (!pa_tagstruct_eof(t)) {
965         pa_context_fail(s->context, PA_ERR_PROTOCOL);
966         goto finish;
967     }
968
969     if (s->direction == PA_STREAM_RECORD) {
970         pa_assert(!s->record_memblockq);
971
972         s->record_memblockq = pa_memblockq_new(
973                 0,
974                 s->buffer_attr.maxlength,
975                 0,
976                 pa_frame_size(&s->sample_spec),
977                 1,
978                 0,
979                 0,
980                 NULL);
981     }
982
983     s->channel_valid = TRUE;
984     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
985
986     create_stream_complete(s);
987
988 finish:
989     pa_stream_unref(s);
990 }
991
992 static int create_stream(
993         pa_stream_direction_t direction,
994         pa_stream *s,
995         const char *dev,
996         const pa_buffer_attr *attr,
997         pa_stream_flags_t flags,
998         const pa_cvolume *volume,
999         pa_stream *sync_stream) {
1000
1001     pa_tagstruct *t;
1002     uint32_t tag;
1003     pa_bool_t volume_set = FALSE;
1004
1005     pa_assert(s);
1006     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1007     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1008
1009     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1010     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1011     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1012     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1013                                               PA_STREAM_INTERPOLATE_TIMING|
1014                                               PA_STREAM_NOT_MONOTONIC|
1015                                               PA_STREAM_AUTO_TIMING_UPDATE|
1016                                               PA_STREAM_NO_REMAP_CHANNELS|
1017                                               PA_STREAM_NO_REMIX_CHANNELS|
1018                                               PA_STREAM_FIX_FORMAT|
1019                                               PA_STREAM_FIX_RATE|
1020                                               PA_STREAM_FIX_CHANNELS|
1021                                               PA_STREAM_DONT_MOVE|
1022                                               PA_STREAM_VARIABLE_RATE|
1023                                               PA_STREAM_PEAK_DETECT|
1024                                               PA_STREAM_START_MUTED|
1025                                               PA_STREAM_ADJUST_LATENCY|
1026                                               PA_STREAM_EARLY_REQUESTS|
1027                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1028                                               PA_STREAM_START_UNMUTED|
1029                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1030
1031     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1032     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1033     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1034     /* Althought some of the other flags are not supported on older
1035      * version, we don't check for them here, because it doesn't hurt
1036      * when they are passed but actually not supported. This makes
1037      * client development easier */
1038
1039     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1040     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1041     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1042     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1043     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1044
1045     pa_stream_ref(s);
1046
1047     s->direction = direction;
1048     s->flags = flags;
1049     s->corked = !!(flags & PA_STREAM_START_CORKED);
1050
1051     if (sync_stream)
1052         s->syncid = sync_stream->syncid;
1053
1054     if (attr)
1055         s->buffer_attr = *attr;
1056     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1057
1058     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1059         pa_usec_t x;
1060
1061         x = pa_rtclock_now();
1062
1063         pa_assert(!s->smoother);
1064         s->smoother = pa_smoother_new(
1065                 SMOOTHER_ADJUST_TIME,
1066                 SMOOTHER_HISTORY_TIME,
1067                 !(flags & PA_STREAM_NOT_MONOTONIC),
1068                 TRUE,
1069                 SMOOTHER_MIN_HISTORY,
1070                 x,
1071                 TRUE);
1072     }
1073
1074     if (!dev)
1075         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1076
1077     t = pa_tagstruct_command(
1078             s->context,
1079             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1080             &tag);
1081
1082     if (s->context->version < 13)
1083         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1084
1085     pa_tagstruct_put(
1086             t,
1087             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1088             PA_TAG_CHANNEL_MAP, &s->channel_map,
1089             PA_TAG_U32, PA_INVALID_INDEX,
1090             PA_TAG_STRING, dev,
1091             PA_TAG_U32, s->buffer_attr.maxlength,
1092             PA_TAG_BOOLEAN, s->corked,
1093             PA_TAG_INVALID);
1094
1095     if (s->direction == PA_STREAM_PLAYBACK) {
1096         pa_cvolume cv;
1097
1098         pa_tagstruct_put(
1099                 t,
1100                 PA_TAG_U32, s->buffer_attr.tlength,
1101                 PA_TAG_U32, s->buffer_attr.prebuf,
1102                 PA_TAG_U32, s->buffer_attr.minreq,
1103                 PA_TAG_U32, s->syncid,
1104                 PA_TAG_INVALID);
1105
1106         volume_set = !!volume;
1107
1108         if (!volume)
1109             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1110
1111         pa_tagstruct_put_cvolume(t, volume);
1112     } else
1113         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1114
1115     if (s->context->version >= 12) {
1116         pa_tagstruct_put(
1117                 t,
1118                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1119                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1120                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1121                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1122                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1123                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1124                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1125                 PA_TAG_INVALID);
1126     }
1127
1128     if (s->context->version >= 13) {
1129
1130         if (s->direction == PA_STREAM_PLAYBACK)
1131             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1132         else
1133             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1134
1135         pa_tagstruct_put(
1136                 t,
1137                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1138                 PA_TAG_PROPLIST, s->proplist,
1139                 PA_TAG_INVALID);
1140
1141         if (s->direction == PA_STREAM_RECORD)
1142             pa_tagstruct_putu32(t, s->direct_on_input);
1143     }
1144
1145     if (s->context->version >= 14) {
1146
1147         if (s->direction == PA_STREAM_PLAYBACK)
1148             pa_tagstruct_put_boolean(t, volume_set);
1149
1150         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1151     }
1152
1153     if (s->context->version >= 15) {
1154
1155         if (s->direction == PA_STREAM_PLAYBACK)
1156             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1157
1158         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1159         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1160     }
1161
1162     pa_pstream_send_tagstruct(s->context->pstream, t);
1163     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1164
1165     pa_stream_set_state(s, PA_STREAM_CREATING);
1166
1167     pa_stream_unref(s);
1168     return 0;
1169 }
1170
1171 int pa_stream_connect_playback(
1172         pa_stream *s,
1173         const char *dev,
1174         const pa_buffer_attr *attr,
1175         pa_stream_flags_t flags,
1176         pa_cvolume *volume,
1177         pa_stream *sync_stream) {
1178
1179     pa_assert(s);
1180     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1181
1182     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1183 }
1184
1185 int pa_stream_connect_record(
1186         pa_stream *s,
1187         const char *dev,
1188         const pa_buffer_attr *attr,
1189         pa_stream_flags_t flags) {
1190
1191     pa_assert(s);
1192     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1193
1194     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1195 }
1196
1197 int pa_stream_write(
1198         pa_stream *s,
1199         const void *data,
1200         size_t length,
1201         void (*free_cb)(void *p),
1202         int64_t offset,
1203         pa_seek_mode_t seek) {
1204
1205     pa_memchunk chunk;
1206     pa_seek_mode_t t_seek;
1207     int64_t t_offset;
1208     size_t t_length;
1209     const void *t_data;
1210
1211     pa_assert(s);
1212     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1213     pa_assert(data);
1214
1215     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1216     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1217     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1218     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1219     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1220
1221     if (length <= 0)
1222         return 0;
1223
1224     t_seek = seek;
1225     t_offset = offset;
1226     t_length = length;
1227     t_data = data;
1228
1229     while (t_length > 0) {
1230
1231         chunk.index = 0;
1232
1233         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1234             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1235             chunk.length = t_length;
1236         } else {
1237             void *d;
1238
1239             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1240             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1241
1242             d = pa_memblock_acquire(chunk.memblock);
1243             memcpy(d, t_data, chunk.length);
1244             pa_memblock_release(chunk.memblock);
1245         }
1246
1247         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1248
1249         t_offset = 0;
1250         t_seek = PA_SEEK_RELATIVE;
1251
1252         t_data = (const uint8_t*) t_data + chunk.length;
1253         t_length -= chunk.length;
1254
1255         pa_memblock_unref(chunk.memblock);
1256     }
1257
1258     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1259         free_cb((void*) data);
1260
1261     /* This is obviously wrong since we ignore the seeking index . But
1262      * that's OK, the server side applies the same error */
1263     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1264
1265     if (s->direction == PA_STREAM_PLAYBACK) {
1266
1267         /* Update latency request correction */
1268         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1269
1270             if (seek == PA_SEEK_ABSOLUTE) {
1271                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1272                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1273                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1274             } else if (seek == PA_SEEK_RELATIVE) {
1275                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1276                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1277             } else
1278                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1279         }
1280
1281         /* Update the write index in the already available latency data */
1282         if (s->timing_info_valid) {
1283
1284             if (seek == PA_SEEK_ABSOLUTE) {
1285                 s->timing_info.write_index_corrupt = FALSE;
1286                 s->timing_info.write_index = offset + (int64_t) length;
1287             } else if (seek == PA_SEEK_RELATIVE) {
1288                 if (!s->timing_info.write_index_corrupt)
1289                     s->timing_info.write_index += offset + (int64_t) length;
1290             } else
1291                 s->timing_info.write_index_corrupt = TRUE;
1292         }
1293
1294         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1295             request_auto_timing_update(s, TRUE);
1296     }
1297
1298     return 0;
1299 }
1300
1301 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1302     pa_assert(s);
1303     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1304     pa_assert(data);
1305     pa_assert(length);
1306
1307     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1308     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1309     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1310
1311     if (!s->peek_memchunk.memblock) {
1312
1313         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1314             *data = NULL;
1315             *length = 0;
1316             return 0;
1317         }
1318
1319         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1320     }
1321
1322     pa_assert(s->peek_data);
1323     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1324     *length = s->peek_memchunk.length;
1325     return 0;
1326 }
1327
1328 int pa_stream_drop(pa_stream *s) {
1329     pa_assert(s);
1330     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1331
1332     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1333     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1334     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1335     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1336
1337     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1338
1339     /* Fix the simulated local read index */
1340     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1341         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1342
1343     pa_assert(s->peek_data);
1344     pa_memblock_release(s->peek_memchunk.memblock);
1345     pa_memblock_unref(s->peek_memchunk.memblock);
1346     pa_memchunk_reset(&s->peek_memchunk);
1347
1348     return 0;
1349 }
1350
1351 size_t pa_stream_writable_size(pa_stream *s) {
1352     pa_assert(s);
1353     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1354
1355     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1356     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1357     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1358
1359     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1360 }
1361
1362 size_t pa_stream_readable_size(pa_stream *s) {
1363     pa_assert(s);
1364     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1365
1366     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1367     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1368     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1369
1370     return pa_memblockq_get_length(s->record_memblockq);
1371 }
1372
1373 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1374     pa_operation *o;
1375     pa_tagstruct *t;
1376     uint32_t tag;
1377
1378     pa_assert(s);
1379     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1380
1381     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1382     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1383     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1384
1385     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1386
1387     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1388     pa_tagstruct_putu32(t, s->channel);
1389     pa_pstream_send_tagstruct(s->context->pstream, t);
1390     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1391
1392     return o;
1393 }
1394
1395 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1396     pa_usec_t usec;
1397
1398     pa_assert(s);
1399     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1400     pa_assert(s->state == PA_STREAM_READY);
1401     pa_assert(s->direction != PA_STREAM_UPLOAD);
1402     pa_assert(s->timing_info_valid);
1403     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1404     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1405
1406     if (s->direction == PA_STREAM_PLAYBACK) {
1407         /* The last byte that was written into the output device
1408          * had this time value associated */
1409         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1410
1411         if (!s->corked && !s->suspended) {
1412
1413             if (!ignore_transport)
1414                 /* Because the latency info took a little time to come
1415                  * to us, we assume that the real output time is actually
1416                  * a little ahead */
1417                 usec += s->timing_info.transport_usec;
1418
1419             /* However, the output device usually maintains a buffer
1420                too, hence the real sample currently played is a little
1421                back  */
1422             if (s->timing_info.sink_usec >= usec)
1423                 usec = 0;
1424             else
1425                 usec -= s->timing_info.sink_usec;
1426         }
1427
1428     } else {
1429         pa_assert(s->direction == PA_STREAM_RECORD);
1430
1431         /* The last byte written into the server side queue had
1432          * this time value associated */
1433         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1434
1435         if (!s->corked && !s->suspended) {
1436
1437             if (!ignore_transport)
1438                 /* Add transport latency */
1439                 usec += s->timing_info.transport_usec;
1440
1441             /* Add latency of data in device buffer */
1442             usec += s->timing_info.source_usec;
1443
1444             /* If this is a monitor source, we need to correct the
1445              * time by the playback device buffer */
1446             if (s->timing_info.sink_usec >= usec)
1447                 usec = 0;
1448             else
1449                 usec -= s->timing_info.sink_usec;
1450         }
1451     }
1452
1453     return usec;
1454 }
1455
1456 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1457     pa_operation *o = userdata;
1458     struct timeval local, remote, now;
1459     pa_timing_info *i;
1460     pa_bool_t playing = FALSE;
1461     uint64_t underrun_for = 0, playing_for = 0;
1462
1463     pa_assert(pd);
1464     pa_assert(o);
1465     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1466
1467     if (!o->context || !o->stream)
1468         goto finish;
1469
1470     i = &o->stream->timing_info;
1471
1472     o->stream->timing_info_valid = FALSE;
1473     i->write_index_corrupt = TRUE;
1474     i->read_index_corrupt = TRUE;
1475
1476     if (command != PA_COMMAND_REPLY) {
1477         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1478             goto finish;
1479
1480     } else {
1481
1482         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1483             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1484             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1485             pa_tagstruct_get_timeval(t, &local) < 0 ||
1486             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1487             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1488             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1489
1490             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1491             goto finish;
1492         }
1493
1494         if (o->context->version >= 13 &&
1495             o->stream->direction == PA_STREAM_PLAYBACK)
1496             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1497                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1498
1499                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1500                 goto finish;
1501             }
1502
1503
1504         if (!pa_tagstruct_eof(t)) {
1505             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1506             goto finish;
1507         }
1508         o->stream->timing_info_valid = TRUE;
1509         i->write_index_corrupt = FALSE;
1510         i->read_index_corrupt = FALSE;
1511
1512         i->playing = (int) playing;
1513         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1514
1515         pa_gettimeofday(&now);
1516
1517         /* Calculcate timestamps */
1518         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1519             /* local and remote seem to have synchronized clocks */
1520
1521             if (o->stream->direction == PA_STREAM_PLAYBACK)
1522                 i->transport_usec = pa_timeval_diff(&remote, &local);
1523             else
1524                 i->transport_usec = pa_timeval_diff(&now, &remote);
1525
1526             i->synchronized_clocks = TRUE;
1527             i->timestamp = remote;
1528         } else {
1529             /* clocks are not synchronized, let's estimate latency then */
1530             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1531             i->synchronized_clocks = FALSE;
1532             i->timestamp = local;
1533             pa_timeval_add(&i->timestamp, i->transport_usec);
1534         }
1535
1536         /* Invalidate read and write indexes if necessary */
1537         if (tag < o->stream->read_index_not_before)
1538             i->read_index_corrupt = TRUE;
1539
1540         if (tag < o->stream->write_index_not_before)
1541             i->write_index_corrupt = TRUE;
1542
1543         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1544             /* Write index correction */
1545
1546             int n, j;
1547             uint32_t ctag = tag;
1548
1549             /* Go through the saved correction values and add up the
1550              * total correction.*/
1551             for (n = 0, j = o->stream->current_write_index_correction+1;
1552                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1553                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1554
1555                 /* Step over invalid data or out-of-date data */
1556                 if (!o->stream->write_index_corrections[j].valid ||
1557                     o->stream->write_index_corrections[j].tag < ctag)
1558                     continue;
1559
1560                 /* Make sure that everything is in order */
1561                 ctag = o->stream->write_index_corrections[j].tag+1;
1562
1563                 /* Now fix the write index */
1564                 if (o->stream->write_index_corrections[j].corrupt) {
1565                     /* A corrupting seek was made */
1566                     i->write_index_corrupt = TRUE;
1567                 } else if (o->stream->write_index_corrections[j].absolute) {
1568                     /* An absolute seek was made */
1569                     i->write_index = o->stream->write_index_corrections[j].value;
1570                     i->write_index_corrupt = FALSE;
1571                 } else if (!i->write_index_corrupt) {
1572                     /* A relative seek was made */
1573                     i->write_index += o->stream->write_index_corrections[j].value;
1574                 }
1575             }
1576
1577             /* Clear old correction entries */
1578             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1579                 if (!o->stream->write_index_corrections[n].valid)
1580                     continue;
1581
1582                 if (o->stream->write_index_corrections[n].tag <= tag)
1583                     o->stream->write_index_corrections[n].valid = FALSE;
1584             }
1585         }
1586
1587         if (o->stream->direction == PA_STREAM_RECORD) {
1588             /* Read index correction */
1589
1590             if (!i->read_index_corrupt)
1591                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1592         }
1593
1594         /* Update smoother */
1595         if (o->stream->smoother) {
1596             pa_usec_t u, x;
1597
1598             u = x = pa_rtclock_now() - i->transport_usec;
1599
1600             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1601                 pa_usec_t su;
1602
1603                 /* If we weren't playing then it will take some time
1604                  * until the audio will actually come out through the
1605                  * speakers. Since we follow that timing here, we need
1606                  * to try to fix this up */
1607
1608                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1609
1610                 if (su < i->sink_usec)
1611                     x += i->sink_usec - su;
1612             }
1613
1614             if (!i->playing)
1615                 pa_smoother_pause(o->stream->smoother, x);
1616
1617             /* Update the smoother */
1618             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1619                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1620                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1621
1622             if (i->playing)
1623                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1624         }
1625     }
1626
1627     o->stream->auto_timing_update_requested = FALSE;
1628
1629     if (o->stream->latency_update_callback)
1630         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1631
1632     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1633         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1634         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1635     }
1636
1637 finish:
1638
1639     pa_operation_done(o);
1640     pa_operation_unref(o);
1641 }
1642
1643 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1644     uint32_t tag;
1645     pa_operation *o;
1646     pa_tagstruct *t;
1647     struct timeval now;
1648     int cidx = 0;
1649
1650     pa_assert(s);
1651     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1652
1653     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1654     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1655     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1656
1657     if (s->direction == PA_STREAM_PLAYBACK) {
1658         /* Find a place to store the write_index correction data for this entry */
1659         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1660
1661         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1662         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1663     }
1664     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1665
1666     t = pa_tagstruct_command(
1667             s->context,
1668             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1669             &tag);
1670     pa_tagstruct_putu32(t, s->channel);
1671     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1672
1673     pa_pstream_send_tagstruct(s->context->pstream, t);
1674     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1675
1676     if (s->direction == PA_STREAM_PLAYBACK) {
1677         /* Fill in initial correction data */
1678
1679         s->current_write_index_correction = cidx;
1680
1681         s->write_index_corrections[cidx].valid = TRUE;
1682         s->write_index_corrections[cidx].absolute = FALSE;
1683         s->write_index_corrections[cidx].corrupt = FALSE;
1684         s->write_index_corrections[cidx].tag = tag;
1685         s->write_index_corrections[cidx].value = 0;
1686     }
1687
1688     return o;
1689 }
1690
1691 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1692     pa_stream *s = userdata;
1693
1694     pa_assert(pd);
1695     pa_assert(s);
1696     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1697
1698     pa_stream_ref(s);
1699
1700     if (command != PA_COMMAND_REPLY) {
1701         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1702             goto finish;
1703
1704         pa_stream_set_state(s, PA_STREAM_FAILED);
1705         goto finish;
1706     } else if (!pa_tagstruct_eof(t)) {
1707         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1708         goto finish;
1709     }
1710
1711     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1712
1713 finish:
1714     pa_stream_unref(s);
1715 }
1716
1717 int pa_stream_disconnect(pa_stream *s) {
1718     pa_tagstruct *t;
1719     uint32_t tag;
1720
1721     pa_assert(s);
1722     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1723
1724     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1725     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1726     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1727
1728     pa_stream_ref(s);
1729
1730     t = pa_tagstruct_command(
1731             s->context,
1732             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1733                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1734             &tag);
1735     pa_tagstruct_putu32(t, s->channel);
1736     pa_pstream_send_tagstruct(s->context->pstream, t);
1737     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1738
1739     pa_stream_unref(s);
1740     return 0;
1741 }
1742
1743 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1744     pa_assert(s);
1745     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1746
1747     if (pa_detect_fork())
1748         return;
1749
1750     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1751         return;
1752
1753     s->read_callback = cb;
1754     s->read_userdata = userdata;
1755 }
1756
1757 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1758     pa_assert(s);
1759     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1760
1761     if (pa_detect_fork())
1762         return;
1763
1764     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1765         return;
1766
1767     s->write_callback = cb;
1768     s->write_userdata = userdata;
1769 }
1770
1771 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1772     pa_assert(s);
1773     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1774
1775     if (pa_detect_fork())
1776         return;
1777
1778     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1779         return;
1780
1781     s->state_callback = cb;
1782     s->state_userdata = userdata;
1783 }
1784
1785 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1786     pa_assert(s);
1787     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1788
1789     if (pa_detect_fork())
1790         return;
1791
1792     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1793         return;
1794
1795     s->overflow_callback = cb;
1796     s->overflow_userdata = userdata;
1797 }
1798
1799 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1800     pa_assert(s);
1801     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1802
1803     if (pa_detect_fork())
1804         return;
1805
1806     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1807         return;
1808
1809     s->underflow_callback = cb;
1810     s->underflow_userdata = userdata;
1811 }
1812
1813 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1814     pa_assert(s);
1815     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1816
1817     if (pa_detect_fork())
1818         return;
1819
1820     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1821         return;
1822
1823     s->latency_update_callback = cb;
1824     s->latency_update_userdata = userdata;
1825 }
1826
1827 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1828     pa_assert(s);
1829     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1830
1831     if (pa_detect_fork())
1832         return;
1833
1834     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1835         return;
1836
1837     s->moved_callback = cb;
1838     s->moved_userdata = userdata;
1839 }
1840
1841 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1842     pa_assert(s);
1843     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1844
1845     if (pa_detect_fork())
1846         return;
1847
1848     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1849         return;
1850
1851     s->suspended_callback = cb;
1852     s->suspended_userdata = userdata;
1853 }
1854
1855 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1856     pa_assert(s);
1857     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1858
1859     if (pa_detect_fork())
1860         return;
1861
1862     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1863         return;
1864
1865     s->started_callback = cb;
1866     s->started_userdata = userdata;
1867 }
1868
1869 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1870     pa_assert(s);
1871     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1872
1873     if (pa_detect_fork())
1874         return;
1875
1876     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1877         return;
1878
1879     s->event_callback = cb;
1880     s->event_userdata = userdata;
1881 }
1882
1883 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1884     pa_assert(s);
1885     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1886
1887     if (pa_detect_fork())
1888         return;
1889
1890     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1891         return;
1892
1893     s->buffer_attr_callback = cb;
1894     s->buffer_attr_userdata = userdata;
1895 }
1896
1897 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1898     pa_operation *o = userdata;
1899     int success = 1;
1900
1901     pa_assert(pd);
1902     pa_assert(o);
1903     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1904
1905     if (!o->context)
1906         goto finish;
1907
1908     if (command != PA_COMMAND_REPLY) {
1909         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1910             goto finish;
1911
1912         success = 0;
1913     } else if (!pa_tagstruct_eof(t)) {
1914         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1915         goto finish;
1916     }
1917
1918     if (o->callback) {
1919         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1920         cb(o->stream, success, o->userdata);
1921     }
1922
1923 finish:
1924     pa_operation_done(o);
1925     pa_operation_unref(o);
1926 }
1927
1928 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1929     pa_operation *o;
1930     pa_tagstruct *t;
1931     uint32_t tag;
1932
1933     pa_assert(s);
1934     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1935
1936     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1937     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1938     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1939
1940     s->corked = b;
1941
1942     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1943
1944     t = pa_tagstruct_command(
1945             s->context,
1946             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1947             &tag);
1948     pa_tagstruct_putu32(t, s->channel);
1949     pa_tagstruct_put_boolean(t, !!b);
1950     pa_pstream_send_tagstruct(s->context->pstream, t);
1951     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1952
1953     check_smoother_status(s, FALSE, FALSE, FALSE);
1954
1955     /* This might cause the indexes to hang/start again, hence
1956      * let's request a timing update */
1957     request_auto_timing_update(s, TRUE);
1958
1959     return o;
1960 }
1961
1962 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1963     pa_tagstruct *t;
1964     pa_operation *o;
1965     uint32_t tag;
1966
1967     pa_assert(s);
1968     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1969
1970     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1971     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1972
1973     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1974
1975     t = pa_tagstruct_command(s->context, command, &tag);
1976     pa_tagstruct_putu32(t, s->channel);
1977     pa_pstream_send_tagstruct(s->context->pstream, t);
1978     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1979
1980     return o;
1981 }
1982
1983 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1984     pa_operation *o;
1985
1986     pa_assert(s);
1987     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1988
1989     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1990     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1991     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1992
1993     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1994         return NULL;
1995
1996     if (s->direction == PA_STREAM_PLAYBACK) {
1997
1998         if (s->write_index_corrections[s->current_write_index_correction].valid)
1999             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2000
2001         if (s->buffer_attr.prebuf > 0)
2002             check_smoother_status(s, FALSE, FALSE, TRUE);
2003
2004         /* This will change the write index, but leave the
2005          * read index untouched. */
2006         invalidate_indexes(s, FALSE, TRUE);
2007
2008     } else
2009         /* For record streams this has no influence on the write
2010          * index, but the read index might jump. */
2011         invalidate_indexes(s, TRUE, FALSE);
2012
2013     return o;
2014 }
2015
2016 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2017     pa_operation *o;
2018
2019     pa_assert(s);
2020     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2021
2022     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2023     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2024     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2025     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2026
2027     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2028         return NULL;
2029
2030     /* This might cause the read index to hang again, hence
2031      * let's request a timing update */
2032     request_auto_timing_update(s, TRUE);
2033
2034     return o;
2035 }
2036
2037 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2038     pa_operation *o;
2039
2040     pa_assert(s);
2041     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2042
2043     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2044     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2045     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2046     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2047
2048     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2049         return NULL;
2050
2051     /* This might cause the read index to start moving again, hence
2052      * let's request a timing update */
2053     request_auto_timing_update(s, TRUE);
2054
2055     return o;
2056 }
2057
2058 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2059     pa_operation *o;
2060
2061     pa_assert(s);
2062     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2063     pa_assert(name);
2064
2065     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2066     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2067     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2068
2069     if (s->context->version >= 13) {
2070         pa_proplist *p = pa_proplist_new();
2071
2072         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2073         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2074         pa_proplist_free(p);
2075     } else {
2076         pa_tagstruct *t;
2077         uint32_t tag;
2078
2079         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2080         t = pa_tagstruct_command(
2081                 s->context,
2082                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2083                 &tag);
2084         pa_tagstruct_putu32(t, s->channel);
2085         pa_tagstruct_puts(t, name);
2086         pa_pstream_send_tagstruct(s->context->pstream, t);
2087         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2088     }
2089
2090     return o;
2091 }
2092
2093 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2094     pa_usec_t usec;
2095
2096     pa_assert(s);
2097     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2098
2099     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2100     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2101     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2102     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2103     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2104     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2105
2106     if (s->smoother)
2107         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2108     else
2109         usec = calc_time(s, FALSE);
2110
2111     /* Make sure the time runs monotonically */
2112     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2113         if (usec < s->previous_time)
2114             usec = s->previous_time;
2115         else
2116             s->previous_time = usec;
2117     }
2118
2119     if (r_usec)
2120         *r_usec = usec;
2121
2122     return 0;
2123 }
2124
2125 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2126     pa_assert(s);
2127     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2128
2129     if (negative)
2130         *negative = 0;
2131
2132     if (a >= b)
2133         return a-b;
2134     else {
2135         if (negative && s->direction == PA_STREAM_RECORD) {
2136             *negative = 1;
2137             return b-a;
2138         } else
2139             return 0;
2140     }
2141 }
2142
2143 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2144     pa_usec_t t, c;
2145     int r;
2146     int64_t cindex;
2147
2148     pa_assert(s);
2149     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2150     pa_assert(r_usec);
2151
2152     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2153     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2154     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2155     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2156     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2157     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2158
2159     if ((r = pa_stream_get_time(s, &t)) < 0)
2160         return r;
2161
2162     if (s->direction == PA_STREAM_PLAYBACK)
2163         cindex = s->timing_info.write_index;
2164     else
2165         cindex = s->timing_info.read_index;
2166
2167     if (cindex < 0)
2168         cindex = 0;
2169
2170     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2171
2172     if (s->direction == PA_STREAM_PLAYBACK)
2173         *r_usec = time_counter_diff(s, c, t, negative);
2174     else
2175         *r_usec = time_counter_diff(s, t, c, negative);
2176
2177     return 0;
2178 }
2179
2180 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2181     pa_assert(s);
2182     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2183
2184     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2185     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2186     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2187     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2188
2189     return &s->timing_info;
2190 }
2191
2192 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2193     pa_assert(s);
2194     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2195
2196     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2197
2198     return &s->sample_spec;
2199 }
2200
2201 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2202     pa_assert(s);
2203     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2204
2205     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2206
2207     return &s->channel_map;
2208 }
2209
2210 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2211     pa_assert(s);
2212     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2213
2214     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2215     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2216     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2217
2218     return &s->buffer_attr;
2219 }
2220
2221 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2222     pa_operation *o = userdata;
2223     int success = 1;
2224
2225     pa_assert(pd);
2226     pa_assert(o);
2227     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2228
2229     if (!o->context)
2230         goto finish;
2231
2232     if (command != PA_COMMAND_REPLY) {
2233         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2234             goto finish;
2235
2236         success = 0;
2237     } else {
2238         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2239             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2240                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2241                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2242                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2243                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2244                 goto finish;
2245             }
2246         } else if (o->stream->direction == PA_STREAM_RECORD) {
2247             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2248                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2249                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2250                 goto finish;
2251             }
2252         }
2253
2254         if (o->stream->context->version >= 13) {
2255             pa_usec_t usec;
2256
2257             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2258                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2259                 goto finish;
2260             }
2261
2262             if (o->stream->direction == PA_STREAM_RECORD)
2263                 o->stream->timing_info.configured_source_usec = usec;
2264             else
2265                 o->stream->timing_info.configured_sink_usec = usec;
2266         }
2267
2268         if (!pa_tagstruct_eof(t)) {
2269             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2270             goto finish;
2271         }
2272     }
2273
2274     if (o->callback) {
2275         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2276         cb(o->stream, success, o->userdata);
2277     }
2278
2279 finish:
2280     pa_operation_done(o);
2281     pa_operation_unref(o);
2282 }
2283
2284
2285 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2286     pa_operation *o;
2287     pa_tagstruct *t;
2288     uint32_t tag;
2289
2290     pa_assert(s);
2291     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2292     pa_assert(attr);
2293
2294     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2295     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2296     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2297     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2298
2299     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2300
2301     t = pa_tagstruct_command(
2302             s->context,
2303             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2304             &tag);
2305     pa_tagstruct_putu32(t, s->channel);
2306
2307     pa_tagstruct_putu32(t, attr->maxlength);
2308
2309     if (s->direction == PA_STREAM_PLAYBACK)
2310         pa_tagstruct_put(
2311                 t,
2312                 PA_TAG_U32, attr->tlength,
2313                 PA_TAG_U32, attr->prebuf,
2314                 PA_TAG_U32, attr->minreq,
2315                 PA_TAG_INVALID);
2316     else
2317         pa_tagstruct_putu32(t, attr->fragsize);
2318
2319     if (s->context->version >= 13)
2320         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2321
2322     if (s->context->version >= 14)
2323         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2324
2325     pa_pstream_send_tagstruct(s->context->pstream, t);
2326     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2327
2328     /* This might cause changes in the read/write indexex, hence let's
2329      * request a timing update */
2330     request_auto_timing_update(s, TRUE);
2331
2332     return o;
2333 }
2334
2335 uint32_t pa_stream_get_device_index(pa_stream *s) {
2336     pa_assert(s);
2337     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2338
2339     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2340     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2341     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2342     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2343     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2344
2345     return s->device_index;
2346 }
2347
2348 const char *pa_stream_get_device_name(pa_stream *s) {
2349     pa_assert(s);
2350     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2351
2352     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2353     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2354     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2355     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2356     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2357
2358     return s->device_name;
2359 }
2360
2361 int pa_stream_is_suspended(pa_stream *s) {
2362     pa_assert(s);
2363     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2364
2365     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2366     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2367     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2368     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2369
2370     return s->suspended;
2371 }
2372
2373 int pa_stream_is_corked(pa_stream *s) {
2374     pa_assert(s);
2375     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2376
2377     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2378     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2379     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2380
2381     return s->corked;
2382 }
2383
2384 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2385     pa_operation *o = userdata;
2386     int success = 1;
2387
2388     pa_assert(pd);
2389     pa_assert(o);
2390     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2391
2392     if (!o->context)
2393         goto finish;
2394
2395     if (command != PA_COMMAND_REPLY) {
2396         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2397             goto finish;
2398
2399         success = 0;
2400     } else {
2401
2402         if (!pa_tagstruct_eof(t)) {
2403             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2404             goto finish;
2405         }
2406     }
2407
2408     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2409     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2410
2411     if (o->callback) {
2412         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2413         cb(o->stream, success, o->userdata);
2414     }
2415
2416 finish:
2417     pa_operation_done(o);
2418     pa_operation_unref(o);
2419 }
2420
2421
2422 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2423     pa_operation *o;
2424     pa_tagstruct *t;
2425     uint32_t tag;
2426
2427     pa_assert(s);
2428     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2429
2430     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2431     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2432     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2433     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2434     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2435     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2436
2437     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2438     o->private = PA_UINT_TO_PTR(rate);
2439
2440     t = pa_tagstruct_command(
2441             s->context,
2442             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2443             &tag);
2444     pa_tagstruct_putu32(t, s->channel);
2445     pa_tagstruct_putu32(t, rate);
2446
2447     pa_pstream_send_tagstruct(s->context->pstream, t);
2448     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2449
2450     return o;
2451 }
2452
2453 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2454     pa_operation *o;
2455     pa_tagstruct *t;
2456     uint32_t tag;
2457
2458     pa_assert(s);
2459     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2460
2461     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2462     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2463     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2464     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2465     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2466
2467     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2468
2469     t = pa_tagstruct_command(
2470             s->context,
2471             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2472             &tag);
2473     pa_tagstruct_putu32(t, s->channel);
2474     pa_tagstruct_putu32(t, (uint32_t) mode);
2475     pa_tagstruct_put_proplist(t, p);
2476
2477     pa_pstream_send_tagstruct(s->context->pstream, t);
2478     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2479
2480     /* Please note that we don't update s->proplist here, because we
2481      * don't export that field */
2482
2483     return o;
2484 }
2485
2486 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2487     pa_operation *o;
2488     pa_tagstruct *t;
2489     uint32_t tag;
2490     const char * const*k;
2491
2492     pa_assert(s);
2493     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2494
2495     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2496     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2497     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2498     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2499     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2500
2501     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2502
2503     t = pa_tagstruct_command(
2504             s->context,
2505             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2506             &tag);
2507     pa_tagstruct_putu32(t, s->channel);
2508
2509     for (k = keys; *k; k++)
2510         pa_tagstruct_puts(t, *k);
2511
2512     pa_tagstruct_puts(t, NULL);
2513
2514     pa_pstream_send_tagstruct(s->context->pstream, t);
2515     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2516
2517     /* Please note that we don't update s->proplist here, because we
2518      * don't export that field */
2519
2520     return o;
2521 }
2522
2523 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2524     pa_assert(s);
2525     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2526
2527     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2528     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2529     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2530     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2531
2532     s->direct_on_input = sink_input_idx;
2533
2534     return 0;
2535 }
2536
2537 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2538     pa_assert(s);
2539     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2540
2541     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2542     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2543     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2544
2545     return s->direct_on_input;
2546 }