minimal reordering
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "fork-detect.h"
42 #include "internal.h"
43
44 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
45
46 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_MIN_HISTORY (4)
49
50 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
51     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
52 }
53
54 static void reset_callbacks(pa_stream *s) {
55     s->read_callback = NULL;
56     s->read_userdata = NULL;
57     s->write_callback = NULL;
58     s->write_userdata = NULL;
59     s->state_callback = NULL;
60     s->state_userdata = NULL;
61     s->overflow_callback = NULL;
62     s->overflow_userdata = NULL;
63     s->underflow_callback = NULL;
64     s->underflow_userdata = NULL;
65     s->latency_update_callback = NULL;
66     s->latency_update_userdata = NULL;
67     s->moved_callback = NULL;
68     s->moved_userdata = NULL;
69     s->suspended_callback = NULL;
70     s->suspended_userdata = NULL;
71     s->started_callback = NULL;
72     s->started_userdata = NULL;
73     s->event_callback = NULL;
74     s->event_userdata = NULL;
75 }
76
77 pa_stream *pa_stream_new_with_proplist(
78         pa_context *c,
79         const char *name,
80         const pa_sample_spec *ss,
81         const pa_channel_map *map,
82         pa_proplist *p) {
83
84     pa_stream *s;
85     int i;
86     pa_channel_map tmap;
87
88     pa_assert(c);
89     pa_assert(PA_REFCNT_VALUE(c) >= 1);
90
91     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
92     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
93     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
94     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
95     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
98
99     if (!map)
100         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
101
102     s = pa_xnew(pa_stream, 1);
103     PA_REFCNT_INIT(s);
104     s->context = c;
105     s->mainloop = c->mainloop;
106
107     s->direction = PA_STREAM_NODIRECTION;
108     s->state = PA_STREAM_UNCONNECTED;
109     s->flags = 0;
110
111     s->sample_spec = *ss;
112     s->channel_map = *map;
113
114     s->direct_on_input = PA_INVALID_INDEX;
115
116     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
117     if (name)
118         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
119
120     s->channel = 0;
121     s->channel_valid = FALSE;
122     s->syncid = c->csyncid++;
123     s->stream_index = PA_INVALID_INDEX;
124
125     s->requested_bytes = 0;
126     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
127
128     /* We initialize der target length here, so that if the user
129      * passes no explicit buffering metrics the default is similar to
130      * what older PA versions provided. */
131
132     s->buffer_attr.maxlength = (uint32_t) -1;
133     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
134     s->buffer_attr.minreq = (uint32_t) -1;
135     s->buffer_attr.prebuf = (uint32_t) -1;
136     s->buffer_attr.fragsize = (uint32_t) -1;
137
138     s->device_index = PA_INVALID_INDEX;
139     s->device_name = NULL;
140     s->suspended = FALSE;
141     s->corked = FALSE;
142
143     pa_memchunk_reset(&s->peek_memchunk);
144     s->peek_data = NULL;
145
146     s->record_memblockq = NULL;
147
148
149     memset(&s->timing_info, 0, sizeof(s->timing_info));
150     s->timing_info_valid = FALSE;
151
152     s->previous_time = 0;
153
154     s->read_index_not_before = 0;
155     s->write_index_not_before = 0;
156     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
157         s->write_index_corrections[i].valid = 0;
158     s->current_write_index_correction = 0;
159
160     s->auto_timing_update_event = NULL;
161     s->auto_timing_update_requested = FALSE;
162
163     reset_callbacks(s);
164
165     s->smoother = NULL;
166
167     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
168     PA_LLIST_PREPEND(pa_stream, c->streams, s);
169     pa_stream_ref(s);
170
171     return s;
172 }
173
174 static void stream_unlink(pa_stream *s) {
175     pa_operation *o, *n;
176     pa_assert(s);
177
178     if (!s->context)
179         return;
180
181     /* Detach from context */
182
183     /* Unref all operatio object that point to us */
184     for (o = s->context->operations; o; o = n) {
185         n = o->next;
186
187         if (o->stream == s)
188             pa_operation_cancel(o);
189     }
190
191     /* Drop all outstanding replies for this stream */
192     if (s->context->pdispatch)
193         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
194
195     if (s->channel_valid) {
196         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
197         s->channel = 0;
198         s->channel_valid = FALSE;
199     }
200
201     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
202     pa_stream_unref(s);
203
204     s->context = NULL;
205
206     if (s->auto_timing_update_event) {
207         pa_assert(s->mainloop);
208         s->mainloop->time_free(s->auto_timing_update_event);
209     }
210
211     reset_callbacks(s);
212 }
213
214 static void stream_free(pa_stream *s) {
215     pa_assert(s);
216
217     stream_unlink(s);
218
219     if (s->peek_memchunk.memblock) {
220         if (s->peek_data)
221             pa_memblock_release(s->peek_memchunk.memblock);
222         pa_memblock_unref(s->peek_memchunk.memblock);
223     }
224
225     if (s->record_memblockq)
226         pa_memblockq_free(s->record_memblockq);
227
228     if (s->proplist)
229         pa_proplist_free(s->proplist);
230
231     if (s->smoother)
232         pa_smoother_free(s->smoother);
233
234     pa_xfree(s->device_name);
235     pa_xfree(s);
236 }
237
238 void pa_stream_unref(pa_stream *s) {
239     pa_assert(s);
240     pa_assert(PA_REFCNT_VALUE(s) >= 1);
241
242     if (PA_REFCNT_DEC(s) <= 0)
243         stream_free(s);
244 }
245
246 pa_stream* pa_stream_ref(pa_stream *s) {
247     pa_assert(s);
248     pa_assert(PA_REFCNT_VALUE(s) >= 1);
249
250     PA_REFCNT_INC(s);
251     return s;
252 }
253
254 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
255     pa_assert(s);
256     pa_assert(PA_REFCNT_VALUE(s) >= 1);
257
258     return s->state;
259 }
260
261 pa_context* pa_stream_get_context(pa_stream *s) {
262     pa_assert(s);
263     pa_assert(PA_REFCNT_VALUE(s) >= 1);
264
265     return s->context;
266 }
267
268 uint32_t pa_stream_get_index(pa_stream *s) {
269     pa_assert(s);
270     pa_assert(PA_REFCNT_VALUE(s) >= 1);
271
272     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
273     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
274
275     return s->stream_index;
276 }
277
278 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
279     pa_assert(s);
280     pa_assert(PA_REFCNT_VALUE(s) >= 1);
281
282     if (s->state == st)
283         return;
284
285     pa_stream_ref(s);
286
287     s->state = st;
288
289     if (s->state_callback)
290         s->state_callback(s, s->state_userdata);
291
292     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
293         stream_unlink(s);
294
295     pa_stream_unref(s);
296 }
297
298 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
299     pa_assert(s);
300     pa_assert(PA_REFCNT_VALUE(s) >= 1);
301
302     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
303         return;
304
305     if (s->state == PA_STREAM_READY &&
306         (force || !s->auto_timing_update_requested)) {
307         pa_operation *o;
308
309 /*         pa_log("automatically requesting new timing data"); */
310
311         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
312             pa_operation_unref(o);
313             s->auto_timing_update_requested = TRUE;
314         }
315     }
316
317     if (s->auto_timing_update_event) {
318         struct timeval next;
319         pa_gettimeofday(&next);
320         pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
321         s->mainloop->time_restart(s->auto_timing_update_event, &next);
322     }
323 }
324
325 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
326     pa_context *c = userdata;
327     pa_stream *s;
328     uint32_t channel;
329
330     pa_assert(pd);
331     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
332     pa_assert(t);
333     pa_assert(c);
334     pa_assert(PA_REFCNT_VALUE(c) >= 1);
335
336     pa_context_ref(c);
337
338     if (pa_tagstruct_getu32(t, &channel) < 0 ||
339         !pa_tagstruct_eof(t)) {
340         pa_context_fail(c, PA_ERR_PROTOCOL);
341         goto finish;
342     }
343
344     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
345         goto finish;
346
347     if (s->state != PA_STREAM_READY)
348         goto finish;
349
350     pa_context_set_error(c, PA_ERR_KILLED);
351     pa_stream_set_state(s, PA_STREAM_FAILED);
352
353 finish:
354     pa_context_unref(c);
355 }
356
357 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
358     pa_usec_t x;
359
360     pa_assert(s);
361     pa_assert(!force_start || !force_stop);
362
363     if (!s->smoother)
364         return;
365
366     x = pa_rtclock_usec();
367
368     if (s->timing_info_valid) {
369         if (aposteriori)
370             x -= s->timing_info.transport_usec;
371         else
372             x += s->timing_info.transport_usec;
373
374         if (s->direction == PA_STREAM_PLAYBACK)
375             /* it takes a while until the pause/resume is actually
376              * audible */
377             x += s->timing_info.sink_usec;
378         else
379             /* Data froma  while back will be dropped */
380             x -= s->timing_info.source_usec;
381     }
382
383     if (s->suspended || s->corked || force_stop)
384         pa_smoother_pause(s->smoother, x);
385     else if (force_start || s->buffer_attr.prebuf == 0)
386         pa_smoother_resume(s->smoother, x);
387
388     /* Please note that we have no idea if playback actually started
389      * if prebuf is non-zero! */
390 }
391
392 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
393     pa_context *c = userdata;
394     pa_stream *s;
395     uint32_t channel;
396     const char *dn;
397     pa_bool_t suspended;
398     uint32_t di;
399     pa_usec_t usec;
400     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
401
402     pa_assert(pd);
403     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
404     pa_assert(t);
405     pa_assert(c);
406     pa_assert(PA_REFCNT_VALUE(c) >= 1);
407
408     pa_context_ref(c);
409
410     if (c->version < 12) {
411         pa_context_fail(c, PA_ERR_PROTOCOL);
412         goto finish;
413     }
414
415     if (pa_tagstruct_getu32(t, &channel) < 0 ||
416         pa_tagstruct_getu32(t, &di) < 0 ||
417         pa_tagstruct_gets(t, &dn) < 0 ||
418         pa_tagstruct_get_boolean(t, &suspended) < 0) {
419         pa_context_fail(c, PA_ERR_PROTOCOL);
420         goto finish;
421     }
422
423     if (c->version >= 13) {
424
425         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
426             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
427                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
428                 pa_tagstruct_get_usec(t, &usec) < 0) {
429                 pa_context_fail(c, PA_ERR_PROTOCOL);
430                 goto finish;
431             }
432         } else {
433             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
434                 pa_tagstruct_getu32(t, &tlength) < 0 ||
435                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
436                 pa_tagstruct_getu32(t, &minreq) < 0 ||
437                 pa_tagstruct_get_usec(t, &usec) < 0) {
438                 pa_context_fail(c, PA_ERR_PROTOCOL);
439                 goto finish;
440             }
441         }
442     }
443
444     if (!pa_tagstruct_eof(t)) {
445         pa_context_fail(c, PA_ERR_PROTOCOL);
446         goto finish;
447     }
448
449     if (!dn || di == PA_INVALID_INDEX) {
450         pa_context_fail(c, PA_ERR_PROTOCOL);
451         goto finish;
452     }
453
454     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
455         goto finish;
456
457     if (s->state != PA_STREAM_READY)
458         goto finish;
459
460     if (c->version >= 13) {
461         if (s->direction == PA_STREAM_RECORD)
462             s->timing_info.configured_source_usec = usec;
463         else
464             s->timing_info.configured_sink_usec = usec;
465
466         s->buffer_attr.maxlength = maxlength;
467         s->buffer_attr.fragsize = fragsize;
468         s->buffer_attr.tlength = tlength;
469         s->buffer_attr.prebuf = prebuf;
470         s->buffer_attr.minreq = minreq;
471     }
472
473     pa_xfree(s->device_name);
474     s->device_name = pa_xstrdup(dn);
475     s->device_index = di;
476
477     s->suspended = suspended;
478
479     check_smoother_status(s, TRUE, FALSE, FALSE);
480     request_auto_timing_update(s, TRUE);
481
482     if (s->moved_callback)
483         s->moved_callback(s, s->moved_userdata);
484
485 finish:
486     pa_context_unref(c);
487 }
488
489 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490     pa_context *c = userdata;
491     pa_stream *s;
492     uint32_t channel;
493     pa_bool_t suspended;
494
495     pa_assert(pd);
496     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
497     pa_assert(t);
498     pa_assert(c);
499     pa_assert(PA_REFCNT_VALUE(c) >= 1);
500
501     pa_context_ref(c);
502
503     if (c->version < 12) {
504         pa_context_fail(c, PA_ERR_PROTOCOL);
505         goto finish;
506     }
507
508     if (pa_tagstruct_getu32(t, &channel) < 0 ||
509         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
510         !pa_tagstruct_eof(t)) {
511         pa_context_fail(c, PA_ERR_PROTOCOL);
512         goto finish;
513     }
514
515     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
516         goto finish;
517
518     if (s->state != PA_STREAM_READY)
519         goto finish;
520
521     s->suspended = suspended;
522
523     check_smoother_status(s, TRUE, FALSE, FALSE);
524     request_auto_timing_update(s, TRUE);
525
526     if (s->suspended_callback)
527         s->suspended_callback(s, s->suspended_userdata);
528
529 finish:
530     pa_context_unref(c);
531 }
532
533 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
534     pa_context *c = userdata;
535     pa_stream *s;
536     uint32_t channel;
537
538     pa_assert(pd);
539     pa_assert(command == PA_COMMAND_STARTED);
540     pa_assert(t);
541     pa_assert(c);
542     pa_assert(PA_REFCNT_VALUE(c) >= 1);
543
544     pa_context_ref(c);
545
546     if (c->version < 13) {
547         pa_context_fail(c, PA_ERR_PROTOCOL);
548         goto finish;
549     }
550
551     if (pa_tagstruct_getu32(t, &channel) < 0 ||
552         !pa_tagstruct_eof(t)) {
553         pa_context_fail(c, PA_ERR_PROTOCOL);
554         goto finish;
555     }
556
557     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
558         goto finish;
559
560     if (s->state != PA_STREAM_READY)
561         goto finish;
562
563     check_smoother_status(s, TRUE, TRUE, FALSE);
564     request_auto_timing_update(s, TRUE);
565
566     if (s->started_callback)
567         s->started_callback(s, s->started_userdata);
568
569 finish:
570     pa_context_unref(c);
571 }
572
573 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
574     pa_context *c = userdata;
575     pa_stream *s;
576     uint32_t channel;
577     pa_proplist *pl = NULL;
578     const char *event;
579
580     pa_assert(pd);
581     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
582     pa_assert(t);
583     pa_assert(c);
584     pa_assert(PA_REFCNT_VALUE(c) >= 1);
585
586     pa_context_ref(c);
587
588     if (c->version < 15) {
589         pa_context_fail(c, PA_ERR_PROTOCOL);
590         goto finish;
591     }
592
593     pl = pa_proplist_new();
594
595     if (pa_tagstruct_getu32(t, &channel) < 0 ||
596         pa_tagstruct_gets(t, &event) < 0 ||
597         pa_tagstruct_get_proplist(t, pl) < 0 ||
598         !pa_tagstruct_eof(t) || !event) {
599         pa_context_fail(c, PA_ERR_PROTOCOL);
600         goto finish;
601     }
602
603     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
604         goto finish;
605
606     if (s->state != PA_STREAM_READY)
607         goto finish;
608
609     if (s->event_callback)
610         s->event_callback(s, event, pl, s->event_userdata);
611
612 finish:
613     pa_context_unref(c);
614
615     if (pl)
616         pa_proplist_free(pl);
617 }
618
619 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
620     pa_stream *s;
621     pa_context *c = userdata;
622     uint32_t bytes, channel;
623
624     pa_assert(pd);
625     pa_assert(command == PA_COMMAND_REQUEST);
626     pa_assert(t);
627     pa_assert(c);
628     pa_assert(PA_REFCNT_VALUE(c) >= 1);
629
630     pa_context_ref(c);
631
632     if (pa_tagstruct_getu32(t, &channel) < 0 ||
633         pa_tagstruct_getu32(t, &bytes) < 0 ||
634         !pa_tagstruct_eof(t)) {
635         pa_context_fail(c, PA_ERR_PROTOCOL);
636         goto finish;
637     }
638
639     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
640         goto finish;
641
642     if (s->state != PA_STREAM_READY)
643         goto finish;
644
645     s->requested_bytes += bytes;
646
647     if (s->requested_bytes > 0 && s->write_callback)
648         s->write_callback(s, s->requested_bytes, s->write_userdata);
649
650 finish:
651     pa_context_unref(c);
652 }
653
654 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
655     pa_stream *s;
656     pa_context *c = userdata;
657     uint32_t channel;
658
659     pa_assert(pd);
660     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
661     pa_assert(t);
662     pa_assert(c);
663     pa_assert(PA_REFCNT_VALUE(c) >= 1);
664
665     pa_context_ref(c);
666
667     if (pa_tagstruct_getu32(t, &channel) < 0 ||
668         !pa_tagstruct_eof(t)) {
669         pa_context_fail(c, PA_ERR_PROTOCOL);
670         goto finish;
671     }
672
673     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
674         goto finish;
675
676     if (s->state != PA_STREAM_READY)
677         goto finish;
678
679     if (s->buffer_attr.prebuf > 0)
680         check_smoother_status(s, TRUE, FALSE, TRUE);
681
682     request_auto_timing_update(s, TRUE);
683
684     if (command == PA_COMMAND_OVERFLOW) {
685         if (s->overflow_callback)
686             s->overflow_callback(s, s->overflow_userdata);
687     } else if (command == PA_COMMAND_UNDERFLOW) {
688         if (s->underflow_callback)
689             s->underflow_callback(s, s->underflow_userdata);
690     }
691
692  finish:
693     pa_context_unref(c);
694 }
695
696 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
697     pa_assert(s);
698     pa_assert(PA_REFCNT_VALUE(s) >= 1);
699
700 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
701
702     if (s->state != PA_STREAM_READY)
703         return;
704
705     if (w) {
706         s->write_index_not_before = s->context->ctag;
707
708         if (s->timing_info_valid)
709             s->timing_info.write_index_corrupt = TRUE;
710
711 /*         pa_log("write_index invalidated"); */
712     }
713
714     if (r) {
715         s->read_index_not_before = s->context->ctag;
716
717         if (s->timing_info_valid)
718             s->timing_info.read_index_corrupt = TRUE;
719
720 /*         pa_log("read_index invalidated"); */
721     }
722
723     request_auto_timing_update(s, TRUE);
724 }
725
726 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
727     pa_stream *s = userdata;
728
729     pa_assert(s);
730     pa_assert(PA_REFCNT_VALUE(s) >= 1);
731
732     pa_stream_ref(s);
733     request_auto_timing_update(s, FALSE);
734     pa_stream_unref(s);
735 }
736
737 static void create_stream_complete(pa_stream *s) {
738     pa_assert(s);
739     pa_assert(PA_REFCNT_VALUE(s) >= 1);
740     pa_assert(s->state == PA_STREAM_CREATING);
741
742     pa_stream_set_state(s, PA_STREAM_READY);
743
744     if (s->requested_bytes > 0 && s->write_callback)
745         s->write_callback(s, s->requested_bytes, s->write_userdata);
746
747     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
748         struct timeval tv;
749         pa_gettimeofday(&tv);
750         tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
751         pa_assert(!s->auto_timing_update_event);
752         s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
753
754         request_auto_timing_update(s, TRUE);
755     }
756
757     check_smoother_status(s, TRUE, FALSE, FALSE);
758 }
759
760 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
761     pa_assert(s);
762     pa_assert(attr);
763     pa_assert(ss);
764
765     if (s->context->version >= 13)
766         return;
767
768     /* Version older than 0.9.10 didn't do server side buffer_attr
769      * selection, hence we have to fake it on the client side. */
770
771     /* We choose fairly conservative values here, to not confuse
772      * old clients with extremely large playback buffers */
773
774     if (attr->maxlength == (uint32_t) -1)
775         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
776
777     if (attr->tlength == (uint32_t) -1)
778         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
779
780     if (attr->minreq == (uint32_t) -1)
781         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
782
783     if (attr->prebuf == (uint32_t) -1)
784         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
785
786     if (attr->fragsize == (uint32_t) -1)
787         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
788 }
789
790 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
791     pa_stream *s = userdata;
792
793     pa_assert(pd);
794     pa_assert(s);
795     pa_assert(PA_REFCNT_VALUE(s) >= 1);
796     pa_assert(s->state == PA_STREAM_CREATING);
797
798     pa_stream_ref(s);
799
800     if (command != PA_COMMAND_REPLY) {
801         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
802             goto finish;
803
804         pa_stream_set_state(s, PA_STREAM_FAILED);
805         goto finish;
806     }
807
808     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
809         s->channel == PA_INVALID_INDEX ||
810         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
811         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
812         pa_context_fail(s->context, PA_ERR_PROTOCOL);
813         goto finish;
814     }
815
816     if (s->context->version >= 9) {
817         if (s->direction == PA_STREAM_PLAYBACK) {
818             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
819                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
820                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
821                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
822                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
823                 goto finish;
824             }
825         } else if (s->direction == PA_STREAM_RECORD) {
826             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
827                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
828                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
829                 goto finish;
830             }
831         }
832     }
833
834     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
835         pa_sample_spec ss;
836         pa_channel_map cm;
837         const char *dn = NULL;
838         pa_bool_t suspended;
839
840         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
841             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
842             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
843             pa_tagstruct_gets(t, &dn) < 0 ||
844             pa_tagstruct_get_boolean(t, &suspended) < 0) {
845             pa_context_fail(s->context, PA_ERR_PROTOCOL);
846             goto finish;
847         }
848
849         if (!dn || s->device_index == PA_INVALID_INDEX ||
850             ss.channels != cm.channels ||
851             !pa_channel_map_valid(&cm) ||
852             !pa_sample_spec_valid(&ss) ||
853             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
854             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
855             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
856             pa_context_fail(s->context, PA_ERR_PROTOCOL);
857             goto finish;
858         }
859
860         pa_xfree(s->device_name);
861         s->device_name = pa_xstrdup(dn);
862         s->suspended = suspended;
863
864         s->channel_map = cm;
865         s->sample_spec = ss;
866     }
867
868     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
869         pa_usec_t usec;
870
871         if (pa_tagstruct_get_usec(t, &usec) < 0) {
872             pa_context_fail(s->context, PA_ERR_PROTOCOL);
873             goto finish;
874         }
875
876         if (s->direction == PA_STREAM_RECORD)
877             s->timing_info.configured_source_usec = usec;
878         else
879             s->timing_info.configured_sink_usec = usec;
880     }
881
882     if (!pa_tagstruct_eof(t)) {
883         pa_context_fail(s->context, PA_ERR_PROTOCOL);
884         goto finish;
885     }
886
887     if (s->direction == PA_STREAM_RECORD) {
888         pa_assert(!s->record_memblockq);
889
890         s->record_memblockq = pa_memblockq_new(
891                 0,
892                 s->buffer_attr.maxlength,
893                 0,
894                 pa_frame_size(&s->sample_spec),
895                 1,
896                 0,
897                 0,
898                 NULL);
899     }
900
901     s->channel_valid = TRUE;
902     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
903
904     create_stream_complete(s);
905
906 finish:
907     pa_stream_unref(s);
908 }
909
910 static int create_stream(
911         pa_stream_direction_t direction,
912         pa_stream *s,
913         const char *dev,
914         const pa_buffer_attr *attr,
915         pa_stream_flags_t flags,
916         const pa_cvolume *volume,
917         pa_stream *sync_stream) {
918
919     pa_tagstruct *t;
920     uint32_t tag;
921     pa_bool_t volume_set = FALSE;
922
923     pa_assert(s);
924     pa_assert(PA_REFCNT_VALUE(s) >= 1);
925     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
926
927     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
928     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
929     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
930     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
931                                               PA_STREAM_INTERPOLATE_TIMING|
932                                               PA_STREAM_NOT_MONOTONIC|
933                                               PA_STREAM_AUTO_TIMING_UPDATE|
934                                               PA_STREAM_NO_REMAP_CHANNELS|
935                                               PA_STREAM_NO_REMIX_CHANNELS|
936                                               PA_STREAM_FIX_FORMAT|
937                                               PA_STREAM_FIX_RATE|
938                                               PA_STREAM_FIX_CHANNELS|
939                                               PA_STREAM_DONT_MOVE|
940                                               PA_STREAM_VARIABLE_RATE|
941                                               PA_STREAM_PEAK_DETECT|
942                                               PA_STREAM_START_MUTED|
943                                               PA_STREAM_ADJUST_LATENCY|
944                                               PA_STREAM_EARLY_REQUESTS|
945                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
946                                               PA_STREAM_START_UNMUTED|
947                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
948
949     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
950     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
951     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
952     /* Althought some of the other flags are not supported on older
953      * version, we don't check for them here, because it doesn't hurt
954      * when they are passed but actually not supported. This makes
955      * client development easier */
956
957     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
958     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
959     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
960     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
961     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
962
963     pa_stream_ref(s);
964
965     s->direction = direction;
966     s->flags = flags;
967     s->corked = !!(flags & PA_STREAM_START_CORKED);
968
969     if (sync_stream)
970         s->syncid = sync_stream->syncid;
971
972     if (attr)
973         s->buffer_attr = *attr;
974     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
975
976     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
977         pa_usec_t x;
978
979         if (s->smoother)
980             pa_smoother_free(s->smoother);
981
982         s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
983
984         x = pa_rtclock_usec();
985         pa_smoother_set_time_offset(s->smoother, x);
986         pa_smoother_pause(s->smoother, x);
987     }
988
989     if (!dev)
990         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
991
992     t = pa_tagstruct_command(
993             s->context,
994             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
995             &tag);
996
997     if (s->context->version < 13)
998         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
999
1000     pa_tagstruct_put(
1001             t,
1002             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1003             PA_TAG_CHANNEL_MAP, &s->channel_map,
1004             PA_TAG_U32, PA_INVALID_INDEX,
1005             PA_TAG_STRING, dev,
1006             PA_TAG_U32, s->buffer_attr.maxlength,
1007             PA_TAG_BOOLEAN, s->corked,
1008             PA_TAG_INVALID);
1009
1010     if (s->direction == PA_STREAM_PLAYBACK) {
1011         pa_cvolume cv;
1012
1013         pa_tagstruct_put(
1014                 t,
1015                 PA_TAG_U32, s->buffer_attr.tlength,
1016                 PA_TAG_U32, s->buffer_attr.prebuf,
1017                 PA_TAG_U32, s->buffer_attr.minreq,
1018                 PA_TAG_U32, s->syncid,
1019                 PA_TAG_INVALID);
1020
1021         volume_set = !!volume;
1022
1023         if (!volume)
1024             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1025
1026         pa_tagstruct_put_cvolume(t, volume);
1027     } else
1028         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1029
1030     if (s->context->version >= 12) {
1031         pa_tagstruct_put(
1032                 t,
1033                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1034                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1035                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1036                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1037                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1038                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1039                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1040                 PA_TAG_INVALID);
1041     }
1042
1043     if (s->context->version >= 13) {
1044
1045         if (s->direction == PA_STREAM_PLAYBACK)
1046             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1047         else
1048             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1049
1050         pa_tagstruct_put(
1051                 t,
1052                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1053                 PA_TAG_PROPLIST, s->proplist,
1054                 PA_TAG_INVALID);
1055
1056         if (s->direction == PA_STREAM_RECORD)
1057             pa_tagstruct_putu32(t, s->direct_on_input);
1058     }
1059
1060     if (s->context->version >= 14) {
1061
1062         if (s->direction == PA_STREAM_PLAYBACK)
1063             pa_tagstruct_put_boolean(t, volume_set);
1064
1065         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1066     }
1067
1068     if (s->context->version >= 15) {
1069
1070         if (s->direction == PA_STREAM_PLAYBACK)
1071             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1072
1073         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1074         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1075     }
1076
1077     pa_pstream_send_tagstruct(s->context->pstream, t);
1078     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1079
1080     pa_stream_set_state(s, PA_STREAM_CREATING);
1081
1082     pa_stream_unref(s);
1083     return 0;
1084 }
1085
1086 int pa_stream_connect_playback(
1087         pa_stream *s,
1088         const char *dev,
1089         const pa_buffer_attr *attr,
1090         pa_stream_flags_t flags,
1091         pa_cvolume *volume,
1092         pa_stream *sync_stream) {
1093
1094     pa_assert(s);
1095     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1096
1097     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1098 }
1099
1100 int pa_stream_connect_record(
1101         pa_stream *s,
1102         const char *dev,
1103         const pa_buffer_attr *attr,
1104         pa_stream_flags_t flags) {
1105
1106     pa_assert(s);
1107     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1108
1109     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1110 }
1111
1112 int pa_stream_write(
1113         pa_stream *s,
1114         const void *data,
1115         size_t length,
1116         void (*free_cb)(void *p),
1117         int64_t offset,
1118         pa_seek_mode_t seek) {
1119
1120     pa_memchunk chunk;
1121     pa_seek_mode_t t_seek;
1122     int64_t t_offset;
1123     size_t t_length;
1124     const void *t_data;
1125
1126     pa_assert(s);
1127     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1128     pa_assert(data);
1129
1130     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1131     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1132     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1133     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1134     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1135
1136     if (length <= 0)
1137         return 0;
1138
1139     t_seek = seek;
1140     t_offset = offset;
1141     t_length = length;
1142     t_data = data;
1143
1144     while (t_length > 0) {
1145
1146         chunk.index = 0;
1147
1148         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1149             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1150             chunk.length = t_length;
1151         } else {
1152             void *d;
1153
1154             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1155             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1156
1157             d = pa_memblock_acquire(chunk.memblock);
1158             memcpy(d, t_data, chunk.length);
1159             pa_memblock_release(chunk.memblock);
1160         }
1161
1162         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1163
1164         t_offset = 0;
1165         t_seek = PA_SEEK_RELATIVE;
1166
1167         t_data = (const uint8_t*) t_data + chunk.length;
1168         t_length -= chunk.length;
1169
1170         pa_memblock_unref(chunk.memblock);
1171     }
1172
1173     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1174         free_cb((void*) data);
1175
1176     if (length < s->requested_bytes)
1177         s->requested_bytes -= (uint32_t) length;
1178     else
1179         s->requested_bytes = 0;
1180
1181     /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1182
1183     if (s->direction == PA_STREAM_PLAYBACK) {
1184
1185         /* Update latency request correction */
1186         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1187
1188             if (seek == PA_SEEK_ABSOLUTE) {
1189                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1190                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1191                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1192             } else if (seek == PA_SEEK_RELATIVE) {
1193                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1194                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1195             } else
1196                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1197         }
1198
1199         /* Update the write index in the already available latency data */
1200         if (s->timing_info_valid) {
1201
1202             if (seek == PA_SEEK_ABSOLUTE) {
1203                 s->timing_info.write_index_corrupt = FALSE;
1204                 s->timing_info.write_index = offset + (int64_t) length;
1205             } else if (seek == PA_SEEK_RELATIVE) {
1206                 if (!s->timing_info.write_index_corrupt)
1207                     s->timing_info.write_index += offset + (int64_t) length;
1208             } else
1209                 s->timing_info.write_index_corrupt = TRUE;
1210         }
1211
1212         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1213             request_auto_timing_update(s, TRUE);
1214     }
1215
1216     return 0;
1217 }
1218
1219 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1220     pa_assert(s);
1221     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1222     pa_assert(data);
1223     pa_assert(length);
1224
1225     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1226     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1227     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1228
1229     if (!s->peek_memchunk.memblock) {
1230
1231         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1232             *data = NULL;
1233             *length = 0;
1234             return 0;
1235         }
1236
1237         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1238     }
1239
1240     pa_assert(s->peek_data);
1241     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1242     *length = s->peek_memchunk.length;
1243     return 0;
1244 }
1245
1246 int pa_stream_drop(pa_stream *s) {
1247     pa_assert(s);
1248     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1249
1250     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1251     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1252     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1253     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1254
1255     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1256
1257     /* Fix the simulated local read index */
1258     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1259         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1260
1261     pa_assert(s->peek_data);
1262     pa_memblock_release(s->peek_memchunk.memblock);
1263     pa_memblock_unref(s->peek_memchunk.memblock);
1264     pa_memchunk_reset(&s->peek_memchunk);
1265
1266     return 0;
1267 }
1268
1269 size_t pa_stream_writable_size(pa_stream *s) {
1270     pa_assert(s);
1271     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1272
1273     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1274     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1275     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1276
1277     return s->requested_bytes;
1278 }
1279
1280 size_t pa_stream_readable_size(pa_stream *s) {
1281     pa_assert(s);
1282     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1283
1284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1285     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1286     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1287
1288     return pa_memblockq_get_length(s->record_memblockq);
1289 }
1290
1291 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1292     pa_operation *o;
1293     pa_tagstruct *t;
1294     uint32_t tag;
1295
1296     pa_assert(s);
1297     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1298
1299     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1300     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1301     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1302
1303     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1304
1305     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1306     pa_tagstruct_putu32(t, s->channel);
1307     pa_pstream_send_tagstruct(s->context->pstream, t);
1308     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1309
1310     return o;
1311 }
1312
1313 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1314     pa_usec_t usec;
1315
1316     pa_assert(s);
1317     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1318     pa_assert(s->state == PA_STREAM_READY);
1319     pa_assert(s->direction != PA_STREAM_UPLOAD);
1320     pa_assert(s->timing_info_valid);
1321     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1322     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1323
1324     if (s->direction == PA_STREAM_PLAYBACK) {
1325         /* The last byte that was written into the output device
1326          * had this time value associated */
1327         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1328
1329         if (!s->corked && !s->suspended) {
1330
1331             if (!ignore_transport)
1332                 /* Because the latency info took a little time to come
1333                  * to us, we assume that the real output time is actually
1334                  * a little ahead */
1335                 usec += s->timing_info.transport_usec;
1336
1337             /* However, the output device usually maintains a buffer
1338                too, hence the real sample currently played is a little
1339                back  */
1340             if (s->timing_info.sink_usec >= usec)
1341                 usec = 0;
1342             else
1343                 usec -= s->timing_info.sink_usec;
1344         }
1345
1346     } else {
1347         pa_assert(s->direction == PA_STREAM_RECORD);
1348
1349         /* The last byte written into the server side queue had
1350          * this time value associated */
1351         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1352
1353         if (!s->corked && !s->suspended) {
1354
1355             if (!ignore_transport)
1356                 /* Add transport latency */
1357                 usec += s->timing_info.transport_usec;
1358
1359             /* Add latency of data in device buffer */
1360             usec += s->timing_info.source_usec;
1361
1362             /* If this is a monitor source, we need to correct the
1363              * time by the playback device buffer */
1364             if (s->timing_info.sink_usec >= usec)
1365                 usec = 0;
1366             else
1367                 usec -= s->timing_info.sink_usec;
1368         }
1369     }
1370
1371     return usec;
1372 }
1373
1374 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1375     pa_operation *o = userdata;
1376     struct timeval local, remote, now;
1377     pa_timing_info *i;
1378     pa_bool_t playing = FALSE;
1379     uint64_t underrun_for = 0, playing_for = 0;
1380
1381     pa_assert(pd);
1382     pa_assert(o);
1383     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1384
1385     if (!o->context || !o->stream)
1386         goto finish;
1387
1388     i = &o->stream->timing_info;
1389
1390     o->stream->timing_info_valid = FALSE;
1391     i->write_index_corrupt = TRUE;
1392     i->read_index_corrupt = TRUE;
1393
1394     if (command != PA_COMMAND_REPLY) {
1395         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1396             goto finish;
1397
1398     } else {
1399
1400         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1401             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1402             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1403             pa_tagstruct_get_timeval(t, &local) < 0 ||
1404             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1405             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1406             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1407
1408             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1409             goto finish;
1410         }
1411
1412         if (o->context->version >= 13 &&
1413             o->stream->direction == PA_STREAM_PLAYBACK)
1414             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1415                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1416
1417                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1418                 goto finish;
1419             }
1420
1421
1422         if (!pa_tagstruct_eof(t)) {
1423             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1424             goto finish;
1425         }
1426         o->stream->timing_info_valid = TRUE;
1427         i->write_index_corrupt = FALSE;
1428         i->read_index_corrupt = FALSE;
1429
1430         i->playing = (int) playing;
1431         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1432
1433         pa_gettimeofday(&now);
1434
1435         /* Calculcate timestamps */
1436         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1437             /* local and remote seem to have synchronized clocks */
1438
1439             if (o->stream->direction == PA_STREAM_PLAYBACK)
1440                 i->transport_usec = pa_timeval_diff(&remote, &local);
1441             else
1442                 i->transport_usec = pa_timeval_diff(&now, &remote);
1443
1444             i->synchronized_clocks = TRUE;
1445             i->timestamp = remote;
1446         } else {
1447             /* clocks are not synchronized, let's estimate latency then */
1448             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1449             i->synchronized_clocks = FALSE;
1450             i->timestamp = local;
1451             pa_timeval_add(&i->timestamp, i->transport_usec);
1452         }
1453
1454         /* Invalidate read and write indexes if necessary */
1455         if (tag < o->stream->read_index_not_before)
1456             i->read_index_corrupt = TRUE;
1457
1458         if (tag < o->stream->write_index_not_before)
1459             i->write_index_corrupt = TRUE;
1460
1461         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1462             /* Write index correction */
1463
1464             int n, j;
1465             uint32_t ctag = tag;
1466
1467             /* Go through the saved correction values and add up the
1468              * total correction.*/
1469             for (n = 0, j = o->stream->current_write_index_correction+1;
1470                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1471                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1472
1473                 /* Step over invalid data or out-of-date data */
1474                 if (!o->stream->write_index_corrections[j].valid ||
1475                     o->stream->write_index_corrections[j].tag < ctag)
1476                     continue;
1477
1478                 /* Make sure that everything is in order */
1479                 ctag = o->stream->write_index_corrections[j].tag+1;
1480
1481                 /* Now fix the write index */
1482                 if (o->stream->write_index_corrections[j].corrupt) {
1483                     /* A corrupting seek was made */
1484                     i->write_index_corrupt = TRUE;
1485                 } else if (o->stream->write_index_corrections[j].absolute) {
1486                     /* An absolute seek was made */
1487                     i->write_index = o->stream->write_index_corrections[j].value;
1488                     i->write_index_corrupt = FALSE;
1489                 } else if (!i->write_index_corrupt) {
1490                     /* A relative seek was made */
1491                     i->write_index += o->stream->write_index_corrections[j].value;
1492                 }
1493             }
1494
1495             /* Clear old correction entries */
1496             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1497                 if (!o->stream->write_index_corrections[n].valid)
1498                     continue;
1499
1500                 if (o->stream->write_index_corrections[n].tag <= tag)
1501                     o->stream->write_index_corrections[n].valid = FALSE;
1502             }
1503         }
1504
1505         if (o->stream->direction == PA_STREAM_RECORD) {
1506             /* Read index correction */
1507
1508             if (!i->read_index_corrupt)
1509                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1510         }
1511
1512         /* Update smoother */
1513         if (o->stream->smoother) {
1514             pa_usec_t u, x;
1515
1516             u = x = pa_rtclock_usec() - i->transport_usec;
1517
1518             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1519                 pa_usec_t su;
1520
1521                 /* If we weren't playing then it will take some time
1522                  * until the audio will actually come out through the
1523                  * speakers. Since we follow that timing here, we need
1524                  * to try to fix this up */
1525
1526                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1527
1528                 if (su < i->sink_usec)
1529                     x += i->sink_usec - su;
1530             }
1531
1532             if (!i->playing)
1533                 pa_smoother_pause(o->stream->smoother, x);
1534
1535             /* Update the smoother */
1536             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1537                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1538                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1539
1540             if (i->playing)
1541                 pa_smoother_resume(o->stream->smoother, x);
1542         }
1543     }
1544
1545     o->stream->auto_timing_update_requested = FALSE;
1546
1547     if (o->stream->latency_update_callback)
1548         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1549
1550     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1551         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1552         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1553     }
1554
1555 finish:
1556
1557     pa_operation_done(o);
1558     pa_operation_unref(o);
1559 }
1560
1561 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1562     uint32_t tag;
1563     pa_operation *o;
1564     pa_tagstruct *t;
1565     struct timeval now;
1566     int cidx = 0;
1567
1568     pa_assert(s);
1569     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1570
1571     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1572     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1573     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1574
1575     if (s->direction == PA_STREAM_PLAYBACK) {
1576         /* Find a place to store the write_index correction data for this entry */
1577         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1578
1579         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1580         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1581     }
1582     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1583
1584     t = pa_tagstruct_command(
1585             s->context,
1586             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1587             &tag);
1588     pa_tagstruct_putu32(t, s->channel);
1589     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1590
1591     pa_pstream_send_tagstruct(s->context->pstream, t);
1592     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1593
1594     if (s->direction == PA_STREAM_PLAYBACK) {
1595         /* Fill in initial correction data */
1596
1597         s->current_write_index_correction = cidx;
1598
1599         s->write_index_corrections[cidx].valid = TRUE;
1600         s->write_index_corrections[cidx].absolute = FALSE;
1601         s->write_index_corrections[cidx].corrupt = FALSE;
1602         s->write_index_corrections[cidx].tag = tag;
1603         s->write_index_corrections[cidx].value = 0;
1604     }
1605
1606     return o;
1607 }
1608
1609 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1610     pa_stream *s = userdata;
1611
1612     pa_assert(pd);
1613     pa_assert(s);
1614     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1615
1616     pa_stream_ref(s);
1617
1618     if (command != PA_COMMAND_REPLY) {
1619         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1620             goto finish;
1621
1622         pa_stream_set_state(s, PA_STREAM_FAILED);
1623         goto finish;
1624     } else if (!pa_tagstruct_eof(t)) {
1625         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1626         goto finish;
1627     }
1628
1629     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1630
1631 finish:
1632     pa_stream_unref(s);
1633 }
1634
1635 int pa_stream_disconnect(pa_stream *s) {
1636     pa_tagstruct *t;
1637     uint32_t tag;
1638
1639     pa_assert(s);
1640     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1641
1642     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1643     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1644     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1645
1646     pa_stream_ref(s);
1647
1648     t = pa_tagstruct_command(
1649             s->context,
1650             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1651                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1652             &tag);
1653     pa_tagstruct_putu32(t, s->channel);
1654     pa_pstream_send_tagstruct(s->context->pstream, t);
1655     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1656
1657     pa_stream_unref(s);
1658     return 0;
1659 }
1660
1661 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1662     pa_assert(s);
1663     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1664
1665     if (pa_detect_fork())
1666         return;
1667
1668     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1669         return;
1670
1671     s->read_callback = cb;
1672     s->read_userdata = userdata;
1673 }
1674
1675 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1676     pa_assert(s);
1677     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1678
1679     if (pa_detect_fork())
1680         return;
1681
1682     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1683         return;
1684
1685     s->write_callback = cb;
1686     s->write_userdata = userdata;
1687 }
1688
1689 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1690     pa_assert(s);
1691     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1692
1693     if (pa_detect_fork())
1694         return;
1695
1696     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1697         return;
1698
1699     s->state_callback = cb;
1700     s->state_userdata = userdata;
1701 }
1702
1703 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1704     pa_assert(s);
1705     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1706
1707     if (pa_detect_fork())
1708         return;
1709
1710     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1711         return;
1712
1713     s->overflow_callback = cb;
1714     s->overflow_userdata = userdata;
1715 }
1716
1717 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1718     pa_assert(s);
1719     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1720
1721     if (pa_detect_fork())
1722         return;
1723
1724     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1725         return;
1726
1727     s->underflow_callback = cb;
1728     s->underflow_userdata = userdata;
1729 }
1730
1731 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1732     pa_assert(s);
1733     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1734
1735     if (pa_detect_fork())
1736         return;
1737
1738     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1739         return;
1740
1741     s->latency_update_callback = cb;
1742     s->latency_update_userdata = userdata;
1743 }
1744
1745 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1746     pa_assert(s);
1747     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1748
1749     if (pa_detect_fork())
1750         return;
1751
1752     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1753         return;
1754
1755     s->moved_callback = cb;
1756     s->moved_userdata = userdata;
1757 }
1758
1759 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1760     pa_assert(s);
1761     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1762
1763     if (pa_detect_fork())
1764         return;
1765
1766     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1767         return;
1768
1769     s->suspended_callback = cb;
1770     s->suspended_userdata = userdata;
1771 }
1772
1773 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1774     pa_assert(s);
1775     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1776
1777     if (pa_detect_fork())
1778         return;
1779
1780     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1781         return;
1782
1783     s->started_callback = cb;
1784     s->started_userdata = userdata;
1785 }
1786
1787 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1788     pa_assert(s);
1789     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1790
1791     if (pa_detect_fork())
1792         return;
1793
1794     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1795         return;
1796
1797     s->event_callback = cb;
1798     s->event_userdata = userdata;
1799 }
1800
1801 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1802     pa_operation *o = userdata;
1803     int success = 1;
1804
1805     pa_assert(pd);
1806     pa_assert(o);
1807     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1808
1809     if (!o->context)
1810         goto finish;
1811
1812     if (command != PA_COMMAND_REPLY) {
1813         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1814             goto finish;
1815
1816         success = 0;
1817     } else if (!pa_tagstruct_eof(t)) {
1818         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1819         goto finish;
1820     }
1821
1822     if (o->callback) {
1823         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1824         cb(o->stream, success, o->userdata);
1825     }
1826
1827 finish:
1828     pa_operation_done(o);
1829     pa_operation_unref(o);
1830 }
1831
1832 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1833     pa_operation *o;
1834     pa_tagstruct *t;
1835     uint32_t tag;
1836
1837     pa_assert(s);
1838     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1839
1840     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1841     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1842     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1843
1844     s->corked = b;
1845
1846     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1847
1848     t = pa_tagstruct_command(
1849             s->context,
1850             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1851             &tag);
1852     pa_tagstruct_putu32(t, s->channel);
1853     pa_tagstruct_put_boolean(t, !!b);
1854     pa_pstream_send_tagstruct(s->context->pstream, t);
1855     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1856
1857     check_smoother_status(s, FALSE, FALSE, FALSE);
1858
1859     /* This might cause the indexes to hang/start again, hence
1860      * let's request a timing update */
1861     request_auto_timing_update(s, TRUE);
1862
1863     return o;
1864 }
1865
1866 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1867     pa_tagstruct *t;
1868     pa_operation *o;
1869     uint32_t tag;
1870
1871     pa_assert(s);
1872     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1873
1874     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1875     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1876
1877     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1878
1879     t = pa_tagstruct_command(s->context, command, &tag);
1880     pa_tagstruct_putu32(t, s->channel);
1881     pa_pstream_send_tagstruct(s->context->pstream, t);
1882     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1883
1884     return o;
1885 }
1886
1887 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1888     pa_operation *o;
1889
1890     pa_assert(s);
1891     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1892
1893     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1894     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1895     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1896
1897     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1898         return NULL;
1899
1900     if (s->direction == PA_STREAM_PLAYBACK) {
1901
1902         if (s->write_index_corrections[s->current_write_index_correction].valid)
1903             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1904
1905         if (s->buffer_attr.prebuf > 0)
1906             check_smoother_status(s, FALSE, FALSE, TRUE);
1907
1908         /* This will change the write index, but leave the
1909          * read index untouched. */
1910         invalidate_indexes(s, FALSE, TRUE);
1911
1912     } else
1913         /* For record streams this has no influence on the write
1914          * index, but the read index might jump. */
1915         invalidate_indexes(s, TRUE, FALSE);
1916
1917     return o;
1918 }
1919
1920 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1921     pa_operation *o;
1922
1923     pa_assert(s);
1924     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1925
1926     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1927     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1928     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1929     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1930
1931     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1932         return NULL;
1933
1934     /* This might cause the read index to hang again, hence
1935      * let's request a timing update */
1936     request_auto_timing_update(s, TRUE);
1937
1938     return o;
1939 }
1940
1941 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1942     pa_operation *o;
1943
1944     pa_assert(s);
1945     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1946
1947     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1948     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1949     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1950     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1951
1952     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1953         return NULL;
1954
1955     /* This might cause the read index to start moving again, hence
1956      * let's request a timing update */
1957     request_auto_timing_update(s, TRUE);
1958
1959     return o;
1960 }
1961
1962 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1963     pa_operation *o;
1964
1965     pa_assert(s);
1966     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1967     pa_assert(name);
1968
1969     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1970     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1971     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1972
1973     if (s->context->version >= 13) {
1974         pa_proplist *p = pa_proplist_new();
1975
1976         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1977         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
1978         pa_proplist_free(p);
1979     } else {
1980         pa_tagstruct *t;
1981         uint32_t tag;
1982
1983         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1984         t = pa_tagstruct_command(
1985                 s->context,
1986                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
1987                 &tag);
1988         pa_tagstruct_putu32(t, s->channel);
1989         pa_tagstruct_puts(t, name);
1990         pa_pstream_send_tagstruct(s->context->pstream, t);
1991         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1992     }
1993
1994     return o;
1995 }
1996
1997 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
1998     pa_usec_t usec;
1999
2000     pa_assert(s);
2001     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2002
2003     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2004     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2005     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2006     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2007     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2008     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2009
2010     if (s->smoother)
2011         usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2012     else
2013         usec = calc_time(s, FALSE);
2014
2015     /* Make sure the time runs monotonically */
2016     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2017         if (usec < s->previous_time)
2018             usec = s->previous_time;
2019         else
2020             s->previous_time = usec;
2021     }
2022
2023     if (r_usec)
2024         *r_usec = usec;
2025
2026     return 0;
2027 }
2028
2029 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2030     pa_assert(s);
2031     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2032
2033     if (negative)
2034         *negative = 0;
2035
2036     if (a >= b)
2037         return a-b;
2038     else {
2039         if (negative && s->direction == PA_STREAM_RECORD) {
2040             *negative = 1;
2041             return b-a;
2042         } else
2043             return 0;
2044     }
2045 }
2046
2047 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2048     pa_usec_t t, c;
2049     int r;
2050     int64_t cindex;
2051
2052     pa_assert(s);
2053     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2054     pa_assert(r_usec);
2055
2056     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2057     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2058     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2059     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2060     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2061     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2062
2063     if ((r = pa_stream_get_time(s, &t)) < 0)
2064         return r;
2065
2066     if (s->direction == PA_STREAM_PLAYBACK)
2067         cindex = s->timing_info.write_index;
2068     else
2069         cindex = s->timing_info.read_index;
2070
2071     if (cindex < 0)
2072         cindex = 0;
2073
2074     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2075
2076     if (s->direction == PA_STREAM_PLAYBACK)
2077         *r_usec = time_counter_diff(s, c, t, negative);
2078     else
2079         *r_usec = time_counter_diff(s, t, c, negative);
2080
2081     return 0;
2082 }
2083
2084 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2085     pa_assert(s);
2086     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2087
2088     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2089     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2090     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2091     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2092
2093     return &s->timing_info;
2094 }
2095
2096 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2097     pa_assert(s);
2098     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2099
2100     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2101
2102     return &s->sample_spec;
2103 }
2104
2105 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2106     pa_assert(s);
2107     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2108
2109     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2110
2111     return &s->channel_map;
2112 }
2113
2114 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2115     pa_assert(s);
2116     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2117
2118     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2119     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2120     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2121
2122     return &s->buffer_attr;
2123 }
2124
2125 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2126     pa_operation *o = userdata;
2127     int success = 1;
2128
2129     pa_assert(pd);
2130     pa_assert(o);
2131     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2132
2133     if (!o->context)
2134         goto finish;
2135
2136     if (command != PA_COMMAND_REPLY) {
2137         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2138             goto finish;
2139
2140         success = 0;
2141     } else {
2142         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2143             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2144                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2145                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2146                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2147                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2148                 goto finish;
2149             }
2150         } else if (o->stream->direction == PA_STREAM_RECORD) {
2151             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2152                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2153                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2154                 goto finish;
2155             }
2156         }
2157
2158         if (o->stream->context->version >= 13) {
2159             pa_usec_t usec;
2160
2161             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2162                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2163                 goto finish;
2164             }
2165
2166             if (o->stream->direction == PA_STREAM_RECORD)
2167                 o->stream->timing_info.configured_source_usec = usec;
2168             else
2169                 o->stream->timing_info.configured_sink_usec = usec;
2170         }
2171
2172         if (!pa_tagstruct_eof(t)) {
2173             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2174             goto finish;
2175         }
2176     }
2177
2178     if (o->callback) {
2179         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2180         cb(o->stream, success, o->userdata);
2181     }
2182
2183 finish:
2184     pa_operation_done(o);
2185     pa_operation_unref(o);
2186 }
2187
2188
2189 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2190     pa_operation *o;
2191     pa_tagstruct *t;
2192     uint32_t tag;
2193
2194     pa_assert(s);
2195     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2196     pa_assert(attr);
2197
2198     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2199     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2200     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2201     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2202
2203     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2204
2205     t = pa_tagstruct_command(
2206             s->context,
2207             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2208             &tag);
2209     pa_tagstruct_putu32(t, s->channel);
2210
2211     pa_tagstruct_putu32(t, attr->maxlength);
2212
2213     if (s->direction == PA_STREAM_PLAYBACK)
2214         pa_tagstruct_put(
2215                 t,
2216                 PA_TAG_U32, attr->tlength,
2217                 PA_TAG_U32, attr->prebuf,
2218                 PA_TAG_U32, attr->minreq,
2219                 PA_TAG_INVALID);
2220     else
2221         pa_tagstruct_putu32(t, attr->fragsize);
2222
2223     if (s->context->version >= 13)
2224         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2225
2226     if (s->context->version >= 14)
2227         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2228
2229     pa_pstream_send_tagstruct(s->context->pstream, t);
2230     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2231
2232     /* This might cause changes in the read/write indexex, hence let's
2233      * request a timing update */
2234     request_auto_timing_update(s, TRUE);
2235
2236     return o;
2237 }
2238
2239 uint32_t pa_stream_get_device_index(pa_stream *s) {
2240     pa_assert(s);
2241     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2242
2243     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2244     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2245     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2246     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2247     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2248
2249     return s->device_index;
2250 }
2251
2252 const char *pa_stream_get_device_name(pa_stream *s) {
2253     pa_assert(s);
2254     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2255
2256     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2257     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2258     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2259     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2260     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2261
2262     return s->device_name;
2263 }
2264
2265 int pa_stream_is_suspended(pa_stream *s) {
2266     pa_assert(s);
2267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2268
2269     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2270     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2271     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2272     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2273
2274     return s->suspended;
2275 }
2276
2277 int pa_stream_is_corked(pa_stream *s) {
2278     pa_assert(s);
2279     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2280
2281     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2282     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2283     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2284
2285     return s->corked;
2286 }
2287
2288 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2289     pa_operation *o = userdata;
2290     int success = 1;
2291
2292     pa_assert(pd);
2293     pa_assert(o);
2294     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2295
2296     if (!o->context)
2297         goto finish;
2298
2299     if (command != PA_COMMAND_REPLY) {
2300         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2301             goto finish;
2302
2303         success = 0;
2304     } else {
2305
2306         if (!pa_tagstruct_eof(t)) {
2307             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2308             goto finish;
2309         }
2310     }
2311
2312     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2313     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2314
2315     if (o->callback) {
2316         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2317         cb(o->stream, success, o->userdata);
2318     }
2319
2320 finish:
2321     pa_operation_done(o);
2322     pa_operation_unref(o);
2323 }
2324
2325
2326 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2327     pa_operation *o;
2328     pa_tagstruct *t;
2329     uint32_t tag;
2330
2331     pa_assert(s);
2332     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2333
2334     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2335     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2336     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2337     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2338     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2339     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2340
2341     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2342     o->private = PA_UINT_TO_PTR(rate);
2343
2344     t = pa_tagstruct_command(
2345             s->context,
2346             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2347             &tag);
2348     pa_tagstruct_putu32(t, s->channel);
2349     pa_tagstruct_putu32(t, rate);
2350
2351     pa_pstream_send_tagstruct(s->context->pstream, t);
2352     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2353
2354     return o;
2355 }
2356
2357 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2358     pa_operation *o;
2359     pa_tagstruct *t;
2360     uint32_t tag;
2361
2362     pa_assert(s);
2363     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2364
2365     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2366     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2367     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2368     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2369     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2370
2371     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2372
2373     t = pa_tagstruct_command(
2374             s->context,
2375             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2376             &tag);
2377     pa_tagstruct_putu32(t, s->channel);
2378     pa_tagstruct_putu32(t, (uint32_t) mode);
2379     pa_tagstruct_put_proplist(t, p);
2380
2381     pa_pstream_send_tagstruct(s->context->pstream, t);
2382     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2383
2384     /* Please note that we don't update s->proplist here, because we
2385      * don't export that field */
2386
2387     return o;
2388 }
2389
2390 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2391     pa_operation *o;
2392     pa_tagstruct *t;
2393     uint32_t tag;
2394     const char * const*k;
2395
2396     pa_assert(s);
2397     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2398
2399     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2400     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2401     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2402     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2403     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2404
2405     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2406
2407     t = pa_tagstruct_command(
2408             s->context,
2409             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2410             &tag);
2411     pa_tagstruct_putu32(t, s->channel);
2412
2413     for (k = keys; *k; k++)
2414         pa_tagstruct_puts(t, *k);
2415
2416     pa_tagstruct_puts(t, NULL);
2417
2418     pa_pstream_send_tagstruct(s->context->pstream, t);
2419     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2420
2421     /* Please note that we don't update s->proplist here, because we
2422      * don't export that field */
2423
2424     return o;
2425 }
2426
2427 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2428     pa_assert(s);
2429     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2430
2431     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2432     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2433     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2434     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2435
2436     s->direct_on_input = sink_input_idx;
2437
2438     return 0;
2439 }
2440
2441 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2442     pa_assert(s);
2443     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2444
2445     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2446     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2447     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2448
2449     return s->direct_on_input;
2450 }