Merge commit '2d903bae9a2e57f997a3d3f335379c3880f95c77'
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "fork-detect.h"
42 #include "internal.h"
43
44 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
45
46 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_MIN_HISTORY (4)
49
50 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
51     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
52 }
53
54 static void reset_callbacks(pa_stream *s) {
55     s->read_callback = NULL;
56     s->read_userdata = NULL;
57     s->write_callback = NULL;
58     s->write_userdata = NULL;
59     s->state_callback = NULL;
60     s->state_userdata = NULL;
61     s->overflow_callback = NULL;
62     s->overflow_userdata = NULL;
63     s->underflow_callback = NULL;
64     s->underflow_userdata = NULL;
65     s->latency_update_callback = NULL;
66     s->latency_update_userdata = NULL;
67     s->moved_callback = NULL;
68     s->moved_userdata = NULL;
69     s->suspended_callback = NULL;
70     s->suspended_userdata = NULL;
71     s->started_callback = NULL;
72     s->started_userdata = NULL;
73     s->event_callback = NULL;
74     s->event_userdata = NULL;
75 }
76
77 pa_stream *pa_stream_new_with_proplist(
78         pa_context *c,
79         const char *name,
80         const pa_sample_spec *ss,
81         const pa_channel_map *map,
82         pa_proplist *p) {
83
84     pa_stream *s;
85     int i;
86     pa_channel_map tmap;
87
88     pa_assert(c);
89     pa_assert(PA_REFCNT_VALUE(c) >= 1);
90
91     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
92     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
93     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
94     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
95     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
98
99     if (!map)
100         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
101
102     s = pa_xnew(pa_stream, 1);
103     PA_REFCNT_INIT(s);
104     s->context = c;
105     s->mainloop = c->mainloop;
106
107     s->direction = PA_STREAM_NODIRECTION;
108     s->state = PA_STREAM_UNCONNECTED;
109     s->flags = 0;
110
111     s->sample_spec = *ss;
112     s->channel_map = *map;
113
114     s->direct_on_input = PA_INVALID_INDEX;
115
116     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
117     if (name)
118         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
119
120     s->channel = 0;
121     s->channel_valid = FALSE;
122     s->syncid = c->csyncid++;
123     s->stream_index = PA_INVALID_INDEX;
124
125     s->requested_bytes = 0;
126     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
127
128     /* We initialize der target length here, so that if the user
129      * passes no explicit buffering metrics the default is similar to
130      * what older PA versions provided. */
131
132     s->buffer_attr.maxlength = (uint32_t) -1;
133     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
134     s->buffer_attr.minreq = (uint32_t) -1;
135     s->buffer_attr.prebuf = (uint32_t) -1;
136     s->buffer_attr.fragsize = (uint32_t) -1;
137
138     s->device_index = PA_INVALID_INDEX;
139     s->device_name = NULL;
140     s->suspended = FALSE;
141
142     pa_memchunk_reset(&s->peek_memchunk);
143     s->peek_data = NULL;
144
145     s->record_memblockq = NULL;
146
147     s->corked = FALSE;
148
149     memset(&s->timing_info, 0, sizeof(s->timing_info));
150     s->timing_info_valid = FALSE;
151
152     s->previous_time = 0;
153
154     s->read_index_not_before = 0;
155     s->write_index_not_before = 0;
156     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
157         s->write_index_corrections[i].valid = 0;
158     s->current_write_index_correction = 0;
159
160     s->auto_timing_update_event = NULL;
161     s->auto_timing_update_requested = FALSE;
162
163     reset_callbacks(s);
164
165     s->smoother = NULL;
166
167     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
168     PA_LLIST_PREPEND(pa_stream, c->streams, s);
169     pa_stream_ref(s);
170
171     return s;
172 }
173
174 static void stream_unlink(pa_stream *s) {
175     pa_operation *o, *n;
176     pa_assert(s);
177
178     if (!s->context)
179         return;
180
181     /* Detach from context */
182
183     /* Unref all operatio object that point to us */
184     for (o = s->context->operations; o; o = n) {
185         n = o->next;
186
187         if (o->stream == s)
188             pa_operation_cancel(o);
189     }
190
191     /* Drop all outstanding replies for this stream */
192     if (s->context->pdispatch)
193         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
194
195     if (s->channel_valid) {
196         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
197         s->channel = 0;
198         s->channel_valid = FALSE;
199     }
200
201     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
202     pa_stream_unref(s);
203
204     s->context = NULL;
205
206     if (s->auto_timing_update_event) {
207         pa_assert(s->mainloop);
208         s->mainloop->time_free(s->auto_timing_update_event);
209     }
210
211     reset_callbacks(s);
212 }
213
214 static void stream_free(pa_stream *s) {
215     pa_assert(s);
216
217     stream_unlink(s);
218
219     if (s->peek_memchunk.memblock) {
220         if (s->peek_data)
221             pa_memblock_release(s->peek_memchunk.memblock);
222         pa_memblock_unref(s->peek_memchunk.memblock);
223     }
224
225     if (s->record_memblockq)
226         pa_memblockq_free(s->record_memblockq);
227
228     if (s->proplist)
229         pa_proplist_free(s->proplist);
230
231     if (s->smoother)
232         pa_smoother_free(s->smoother);
233
234     pa_xfree(s->device_name);
235     pa_xfree(s);
236 }
237
238 void pa_stream_unref(pa_stream *s) {
239     pa_assert(s);
240     pa_assert(PA_REFCNT_VALUE(s) >= 1);
241
242     if (PA_REFCNT_DEC(s) <= 0)
243         stream_free(s);
244 }
245
246 pa_stream* pa_stream_ref(pa_stream *s) {
247     pa_assert(s);
248     pa_assert(PA_REFCNT_VALUE(s) >= 1);
249
250     PA_REFCNT_INC(s);
251     return s;
252 }
253
254 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
255     pa_assert(s);
256     pa_assert(PA_REFCNT_VALUE(s) >= 1);
257
258     return s->state;
259 }
260
261 pa_context* pa_stream_get_context(pa_stream *s) {
262     pa_assert(s);
263     pa_assert(PA_REFCNT_VALUE(s) >= 1);
264
265     return s->context;
266 }
267
268 uint32_t pa_stream_get_index(pa_stream *s) {
269     pa_assert(s);
270     pa_assert(PA_REFCNT_VALUE(s) >= 1);
271
272     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
273     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
274
275     return s->stream_index;
276 }
277
278 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
279     pa_assert(s);
280     pa_assert(PA_REFCNT_VALUE(s) >= 1);
281
282     if (s->state == st)
283         return;
284
285     pa_stream_ref(s);
286
287     s->state = st;
288
289     if (s->state_callback)
290         s->state_callback(s, s->state_userdata);
291
292     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
293         stream_unlink(s);
294
295     pa_stream_unref(s);
296 }
297
298 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
299     pa_assert(s);
300     pa_assert(PA_REFCNT_VALUE(s) >= 1);
301
302     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
303         return;
304
305     if (s->state == PA_STREAM_READY &&
306         (force || !s->auto_timing_update_requested)) {
307         pa_operation *o;
308
309 /*         pa_log("automatically requesting new timing data"); */
310
311         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
312             pa_operation_unref(o);
313             s->auto_timing_update_requested = TRUE;
314         }
315     }
316
317     if (s->auto_timing_update_event) {
318         struct timeval next;
319         pa_gettimeofday(&next);
320         pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
321         s->mainloop->time_restart(s->auto_timing_update_event, &next);
322     }
323 }
324
325 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
326     pa_context *c = userdata;
327     pa_stream *s;
328     uint32_t channel;
329
330     pa_assert(pd);
331     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
332     pa_assert(t);
333     pa_assert(c);
334     pa_assert(PA_REFCNT_VALUE(c) >= 1);
335
336     pa_context_ref(c);
337
338     if (pa_tagstruct_getu32(t, &channel) < 0 ||
339         !pa_tagstruct_eof(t)) {
340         pa_context_fail(c, PA_ERR_PROTOCOL);
341         goto finish;
342     }
343
344     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
345         goto finish;
346
347     if (s->state != PA_STREAM_READY)
348         goto finish;
349
350     pa_context_set_error(c, PA_ERR_KILLED);
351     pa_stream_set_state(s, PA_STREAM_FAILED);
352
353 finish:
354     pa_context_unref(c);
355 }
356
357 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
358     pa_usec_t x;
359
360     pa_assert(s);
361     pa_assert(!force_start || !force_stop);
362
363     if (!s->smoother)
364         return;
365
366     x = pa_rtclock_usec();
367
368     if (s->timing_info_valid) {
369         if (aposteriori)
370             x -= s->timing_info.transport_usec;
371         else
372             x += s->timing_info.transport_usec;
373
374         if (s->direction == PA_STREAM_PLAYBACK)
375             /* it takes a while until the pause/resume is actually
376              * audible */
377             x += s->timing_info.sink_usec;
378         else
379             /* Data froma  while back will be dropped */
380             x -= s->timing_info.source_usec;
381     }
382
383     if (s->suspended || s->corked || force_stop)
384         pa_smoother_pause(s->smoother, x);
385     else if (force_start || s->buffer_attr.prebuf == 0)
386         pa_smoother_resume(s->smoother, x);
387
388     /* Please note that we have no idea if playback actually started
389      * if prebuf is non-zero! */
390 }
391
392 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
393     pa_context *c = userdata;
394     pa_stream *s;
395     uint32_t channel;
396     const char *dn;
397     pa_bool_t suspended;
398     uint32_t di;
399     pa_usec_t usec;
400     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
401
402     pa_assert(pd);
403     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
404     pa_assert(t);
405     pa_assert(c);
406     pa_assert(PA_REFCNT_VALUE(c) >= 1);
407
408     pa_context_ref(c);
409
410     if (c->version < 12) {
411         pa_context_fail(c, PA_ERR_PROTOCOL);
412         goto finish;
413     }
414
415     if (pa_tagstruct_getu32(t, &channel) < 0 ||
416         pa_tagstruct_getu32(t, &di) < 0 ||
417         pa_tagstruct_gets(t, &dn) < 0 ||
418         pa_tagstruct_get_boolean(t, &suspended) < 0) {
419         pa_context_fail(c, PA_ERR_PROTOCOL);
420         goto finish;
421     }
422
423     if (c->version >= 13) {
424
425         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
426             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
427                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
428                 pa_tagstruct_get_usec(t, &usec) < 0) {
429                 pa_context_fail(c, PA_ERR_PROTOCOL);
430                 goto finish;
431             }
432         } else {
433             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
434                 pa_tagstruct_getu32(t, &tlength) < 0 ||
435                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
436                 pa_tagstruct_getu32(t, &minreq) < 0 ||
437                 pa_tagstruct_get_usec(t, &usec) < 0) {
438                 pa_context_fail(c, PA_ERR_PROTOCOL);
439                 goto finish;
440             }
441         }
442     }
443
444     if (!pa_tagstruct_eof(t)) {
445         pa_context_fail(c, PA_ERR_PROTOCOL);
446         goto finish;
447     }
448
449     if (!dn || di == PA_INVALID_INDEX) {
450         pa_context_fail(c, PA_ERR_PROTOCOL);
451         goto finish;
452     }
453
454     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
455         goto finish;
456
457     if (s->state != PA_STREAM_READY)
458         goto finish;
459
460     if (c->version >= 13) {
461         if (s->direction == PA_STREAM_RECORD)
462             s->timing_info.configured_source_usec = usec;
463         else
464             s->timing_info.configured_sink_usec = usec;
465
466         s->buffer_attr.maxlength = maxlength;
467         s->buffer_attr.fragsize = fragsize;
468         s->buffer_attr.tlength = tlength;
469         s->buffer_attr.prebuf = prebuf;
470         s->buffer_attr.minreq = minreq;
471     }
472
473     pa_xfree(s->device_name);
474     s->device_name = pa_xstrdup(dn);
475     s->device_index = di;
476
477     s->suspended = suspended;
478
479     check_smoother_status(s, TRUE, FALSE, FALSE);
480     request_auto_timing_update(s, TRUE);
481
482     if (s->moved_callback)
483         s->moved_callback(s, s->moved_userdata);
484
485 finish:
486     pa_context_unref(c);
487 }
488
489 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490     pa_context *c = userdata;
491     pa_stream *s;
492     uint32_t channel;
493     pa_bool_t suspended;
494
495     pa_assert(pd);
496     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
497     pa_assert(t);
498     pa_assert(c);
499     pa_assert(PA_REFCNT_VALUE(c) >= 1);
500
501     pa_context_ref(c);
502
503     if (c->version < 12) {
504         pa_context_fail(c, PA_ERR_PROTOCOL);
505         goto finish;
506     }
507
508     if (pa_tagstruct_getu32(t, &channel) < 0 ||
509         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
510         !pa_tagstruct_eof(t)) {
511         pa_context_fail(c, PA_ERR_PROTOCOL);
512         goto finish;
513     }
514
515     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
516         goto finish;
517
518     if (s->state != PA_STREAM_READY)
519         goto finish;
520
521     s->suspended = suspended;
522
523     check_smoother_status(s, TRUE, FALSE, FALSE);
524     request_auto_timing_update(s, TRUE);
525
526     if (s->suspended_callback)
527         s->suspended_callback(s, s->suspended_userdata);
528
529 finish:
530     pa_context_unref(c);
531 }
532
533 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
534     pa_context *c = userdata;
535     pa_stream *s;
536     uint32_t channel;
537
538     pa_assert(pd);
539     pa_assert(command == PA_COMMAND_STARTED);
540     pa_assert(t);
541     pa_assert(c);
542     pa_assert(PA_REFCNT_VALUE(c) >= 1);
543
544     pa_context_ref(c);
545
546     if (c->version < 13) {
547         pa_context_fail(c, PA_ERR_PROTOCOL);
548         goto finish;
549     }
550
551     if (pa_tagstruct_getu32(t, &channel) < 0 ||
552         !pa_tagstruct_eof(t)) {
553         pa_context_fail(c, PA_ERR_PROTOCOL);
554         goto finish;
555     }
556
557     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
558         goto finish;
559
560     if (s->state != PA_STREAM_READY)
561         goto finish;
562
563     check_smoother_status(s, TRUE, TRUE, FALSE);
564     request_auto_timing_update(s, TRUE);
565
566     if (s->started_callback)
567         s->started_callback(s, s->started_userdata);
568
569 finish:
570     pa_context_unref(c);
571 }
572
573 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
574     pa_context *c = userdata;
575     pa_stream *s;
576     uint32_t channel;
577     pa_proplist *pl = NULL;
578     const char *event;
579
580     pa_assert(pd);
581     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
582     pa_assert(t);
583     pa_assert(c);
584     pa_assert(PA_REFCNT_VALUE(c) >= 1);
585
586     pa_context_ref(c);
587
588     if (c->version < 15) {
589         pa_context_fail(c, PA_ERR_PROTOCOL);
590         goto finish;
591     }
592
593     pl = pa_proplist_new();
594
595     if (pa_tagstruct_getu32(t, &channel) < 0 ||
596         pa_tagstruct_gets(t, &event) < 0 ||
597         pa_tagstruct_get_proplist(t, pl) < 0 ||
598         !pa_tagstruct_eof(t) || !event) {
599         pa_context_fail(c, PA_ERR_PROTOCOL);
600         goto finish;
601     }
602
603     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
604         goto finish;
605
606     if (s->state != PA_STREAM_READY)
607         goto finish;
608
609     if (s->event_callback)
610         s->event_callback(s, event, pl, s->event_userdata);
611
612 finish:
613     pa_context_unref(c);
614
615     if (pl)
616         pa_proplist_free(pl);
617 }
618
619 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
620     pa_stream *s;
621     pa_context *c = userdata;
622     uint32_t bytes, channel;
623
624     pa_assert(pd);
625     pa_assert(command == PA_COMMAND_REQUEST);
626     pa_assert(t);
627     pa_assert(c);
628     pa_assert(PA_REFCNT_VALUE(c) >= 1);
629
630     pa_context_ref(c);
631
632     if (pa_tagstruct_getu32(t, &channel) < 0 ||
633         pa_tagstruct_getu32(t, &bytes) < 0 ||
634         !pa_tagstruct_eof(t)) {
635         pa_context_fail(c, PA_ERR_PROTOCOL);
636         goto finish;
637     }
638
639     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
640         goto finish;
641
642     if (s->state != PA_STREAM_READY)
643         goto finish;
644
645     s->requested_bytes += bytes;
646
647     if (s->requested_bytes > 0 && s->write_callback)
648         s->write_callback(s, s->requested_bytes, s->write_userdata);
649
650 finish:
651     pa_context_unref(c);
652 }
653
654 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
655     pa_stream *s;
656     pa_context *c = userdata;
657     uint32_t channel;
658
659     pa_assert(pd);
660     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
661     pa_assert(t);
662     pa_assert(c);
663     pa_assert(PA_REFCNT_VALUE(c) >= 1);
664
665     pa_context_ref(c);
666
667     if (pa_tagstruct_getu32(t, &channel) < 0 ||
668         !pa_tagstruct_eof(t)) {
669         pa_context_fail(c, PA_ERR_PROTOCOL);
670         goto finish;
671     }
672
673     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
674         goto finish;
675
676     if (s->state != PA_STREAM_READY)
677         goto finish;
678
679     if (s->buffer_attr.prebuf > 0)
680         check_smoother_status(s, TRUE, FALSE, TRUE);
681
682     request_auto_timing_update(s, TRUE);
683
684     if (command == PA_COMMAND_OVERFLOW) {
685         if (s->overflow_callback)
686             s->overflow_callback(s, s->overflow_userdata);
687     } else if (command == PA_COMMAND_UNDERFLOW) {
688         if (s->underflow_callback)
689             s->underflow_callback(s, s->underflow_userdata);
690     }
691
692  finish:
693     pa_context_unref(c);
694 }
695
696 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
697     pa_assert(s);
698     pa_assert(PA_REFCNT_VALUE(s) >= 1);
699
700 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
701
702     if (s->state != PA_STREAM_READY)
703         return;
704
705     if (w) {
706         s->write_index_not_before = s->context->ctag;
707
708         if (s->timing_info_valid)
709             s->timing_info.write_index_corrupt = TRUE;
710
711 /*         pa_log("write_index invalidated"); */
712     }
713
714     if (r) {
715         s->read_index_not_before = s->context->ctag;
716
717         if (s->timing_info_valid)
718             s->timing_info.read_index_corrupt = TRUE;
719
720 /*         pa_log("read_index invalidated"); */
721     }
722
723     request_auto_timing_update(s, TRUE);
724 }
725
726 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
727     pa_stream *s = userdata;
728
729     pa_assert(s);
730     pa_assert(PA_REFCNT_VALUE(s) >= 1);
731
732     pa_stream_ref(s);
733     request_auto_timing_update(s, FALSE);
734     pa_stream_unref(s);
735 }
736
737 static void create_stream_complete(pa_stream *s) {
738     pa_assert(s);
739     pa_assert(PA_REFCNT_VALUE(s) >= 1);
740     pa_assert(s->state == PA_STREAM_CREATING);
741
742     pa_stream_set_state(s, PA_STREAM_READY);
743
744     if (s->requested_bytes > 0 && s->write_callback)
745         s->write_callback(s, s->requested_bytes, s->write_userdata);
746
747     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
748         struct timeval tv;
749         pa_gettimeofday(&tv);
750         tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
751         pa_assert(!s->auto_timing_update_event);
752         s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
753
754         request_auto_timing_update(s, TRUE);
755     }
756
757     check_smoother_status(s, TRUE, FALSE, FALSE);
758 }
759
760 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
761     pa_assert(s);
762     pa_assert(attr);
763     pa_assert(ss);
764
765     if (s->context->version >= 13)
766         return;
767
768     /* Version older than 0.9.10 didn't do server side buffer_attr
769      * selection, hence we have to fake it on the client side. */
770
771     /* We choose fairly conservative values here, to not confuse
772      * old clients with extremely large playback buffers */
773
774     if (attr->maxlength == (uint32_t) -1)
775         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
776
777     if (attr->tlength == (uint32_t) -1)
778         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
779
780     if (attr->minreq == (uint32_t) -1)
781         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
782
783     if (attr->prebuf == (uint32_t) -1)
784         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
785
786     if (attr->fragsize == (uint32_t) -1)
787         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
788 }
789
790 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
791     pa_stream *s = userdata;
792
793     pa_assert(pd);
794     pa_assert(s);
795     pa_assert(PA_REFCNT_VALUE(s) >= 1);
796     pa_assert(s->state == PA_STREAM_CREATING);
797
798     pa_stream_ref(s);
799
800     if (command != PA_COMMAND_REPLY) {
801         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
802             goto finish;
803
804         pa_stream_set_state(s, PA_STREAM_FAILED);
805         goto finish;
806     }
807
808     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
809         s->channel == PA_INVALID_INDEX ||
810         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
811         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
812         pa_context_fail(s->context, PA_ERR_PROTOCOL);
813         goto finish;
814     }
815
816     if (s->context->version >= 9) {
817         if (s->direction == PA_STREAM_PLAYBACK) {
818             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
819                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
820                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
821                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
822                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
823                 goto finish;
824             }
825         } else if (s->direction == PA_STREAM_RECORD) {
826             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
827                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
828                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
829                 goto finish;
830             }
831         }
832     }
833
834     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
835         pa_sample_spec ss;
836         pa_channel_map cm;
837         const char *dn = NULL;
838         pa_bool_t suspended;
839
840         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
841             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
842             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
843             pa_tagstruct_gets(t, &dn) < 0 ||
844             pa_tagstruct_get_boolean(t, &suspended) < 0) {
845             pa_context_fail(s->context, PA_ERR_PROTOCOL);
846             goto finish;
847         }
848
849         if (!dn || s->device_index == PA_INVALID_INDEX ||
850             ss.channels != cm.channels ||
851             !pa_channel_map_valid(&cm) ||
852             !pa_sample_spec_valid(&ss) ||
853             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
854             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
855             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
856             pa_context_fail(s->context, PA_ERR_PROTOCOL);
857             goto finish;
858         }
859
860         pa_xfree(s->device_name);
861         s->device_name = pa_xstrdup(dn);
862         s->suspended = suspended;
863
864         s->channel_map = cm;
865         s->sample_spec = ss;
866     }
867
868     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
869         pa_usec_t usec;
870
871         if (pa_tagstruct_get_usec(t, &usec) < 0) {
872             pa_context_fail(s->context, PA_ERR_PROTOCOL);
873             goto finish;
874         }
875
876         if (s->direction == PA_STREAM_RECORD)
877             s->timing_info.configured_source_usec = usec;
878         else
879             s->timing_info.configured_sink_usec = usec;
880     }
881
882     if (!pa_tagstruct_eof(t)) {
883         pa_context_fail(s->context, PA_ERR_PROTOCOL);
884         goto finish;
885     }
886
887     if (s->direction == PA_STREAM_RECORD) {
888         pa_assert(!s->record_memblockq);
889
890         s->record_memblockq = pa_memblockq_new(
891                 0,
892                 s->buffer_attr.maxlength,
893                 0,
894                 pa_frame_size(&s->sample_spec),
895                 1,
896                 0,
897                 0,
898                 NULL);
899     }
900
901     s->channel_valid = TRUE;
902     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
903
904     create_stream_complete(s);
905
906 finish:
907     pa_stream_unref(s);
908 }
909
910 static int create_stream(
911         pa_stream_direction_t direction,
912         pa_stream *s,
913         const char *dev,
914         const pa_buffer_attr *attr,
915         pa_stream_flags_t flags,
916         const pa_cvolume *volume,
917         pa_stream *sync_stream) {
918
919     pa_tagstruct *t;
920     uint32_t tag;
921     pa_bool_t volume_set = FALSE;
922
923     pa_assert(s);
924     pa_assert(PA_REFCNT_VALUE(s) >= 1);
925     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
926
927     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
928     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
929     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
930     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
931                                               PA_STREAM_INTERPOLATE_TIMING|
932                                               PA_STREAM_NOT_MONOTONIC|
933                                               PA_STREAM_AUTO_TIMING_UPDATE|
934                                               PA_STREAM_NO_REMAP_CHANNELS|
935                                               PA_STREAM_NO_REMIX_CHANNELS|
936                                               PA_STREAM_FIX_FORMAT|
937                                               PA_STREAM_FIX_RATE|
938                                               PA_STREAM_FIX_CHANNELS|
939                                               PA_STREAM_DONT_MOVE|
940                                               PA_STREAM_VARIABLE_RATE|
941                                               PA_STREAM_PEAK_DETECT|
942                                               PA_STREAM_START_MUTED|
943                                               PA_STREAM_ADJUST_LATENCY|
944                                               PA_STREAM_EARLY_REQUESTS|
945                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
946                                               PA_STREAM_START_UNMUTED|
947                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
948
949     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
950     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
951     /* Althought some of the other flags are not supported on older
952      * version, we don't check for them here, because it doesn't hurt
953      * when they are passed but actually not supported. This makes
954      * client development easier */
955
956     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
957     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
958     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
959     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
960     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
961
962     pa_stream_ref(s);
963
964     s->direction = direction;
965     s->flags = flags;
966     s->corked = !!(flags & PA_STREAM_START_CORKED);
967
968     if (sync_stream)
969         s->syncid = sync_stream->syncid;
970
971     if (attr)
972         s->buffer_attr = *attr;
973     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
974
975     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
976         pa_usec_t x;
977
978         if (s->smoother)
979             pa_smoother_free(s->smoother);
980
981         s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
982
983         x = pa_rtclock_usec();
984         pa_smoother_set_time_offset(s->smoother, x);
985         pa_smoother_pause(s->smoother, x);
986     }
987
988     if (!dev)
989         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
990
991     t = pa_tagstruct_command(
992             s->context,
993             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
994             &tag);
995
996     if (s->context->version < 13)
997         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
998
999     pa_tagstruct_put(
1000             t,
1001             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1002             PA_TAG_CHANNEL_MAP, &s->channel_map,
1003             PA_TAG_U32, PA_INVALID_INDEX,
1004             PA_TAG_STRING, dev,
1005             PA_TAG_U32, s->buffer_attr.maxlength,
1006             PA_TAG_BOOLEAN, s->corked,
1007             PA_TAG_INVALID);
1008
1009     if (s->direction == PA_STREAM_PLAYBACK) {
1010         pa_cvolume cv;
1011
1012         pa_tagstruct_put(
1013                 t,
1014                 PA_TAG_U32, s->buffer_attr.tlength,
1015                 PA_TAG_U32, s->buffer_attr.prebuf,
1016                 PA_TAG_U32, s->buffer_attr.minreq,
1017                 PA_TAG_U32, s->syncid,
1018                 PA_TAG_INVALID);
1019
1020         volume_set = !!volume;
1021
1022         if (!volume)
1023             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1024
1025         pa_tagstruct_put_cvolume(t, volume);
1026     } else
1027         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1028
1029     if (s->context->version >= 12) {
1030         pa_tagstruct_put(
1031                 t,
1032                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1033                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1034                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1035                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1036                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1037                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1038                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1039                 PA_TAG_INVALID);
1040     }
1041
1042     if (s->context->version >= 13) {
1043
1044         if (s->direction == PA_STREAM_PLAYBACK)
1045             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1046         else
1047             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1048
1049         pa_tagstruct_put(
1050                 t,
1051                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1052                 PA_TAG_PROPLIST, s->proplist,
1053                 PA_TAG_INVALID);
1054
1055         if (s->direction == PA_STREAM_RECORD)
1056             pa_tagstruct_putu32(t, s->direct_on_input);
1057     }
1058
1059     if (s->context->version >= 14) {
1060
1061         if (s->direction == PA_STREAM_PLAYBACK)
1062             pa_tagstruct_put_boolean(t, volume_set);
1063
1064         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1065     }
1066
1067     if (s->context->version >= 15) {
1068
1069         if (s->direction == PA_STREAM_PLAYBACK)
1070             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1071
1072         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1073         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1074     }
1075
1076     pa_pstream_send_tagstruct(s->context->pstream, t);
1077     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1078
1079     pa_stream_set_state(s, PA_STREAM_CREATING);
1080
1081     pa_stream_unref(s);
1082     return 0;
1083 }
1084
1085 int pa_stream_connect_playback(
1086         pa_stream *s,
1087         const char *dev,
1088         const pa_buffer_attr *attr,
1089         pa_stream_flags_t flags,
1090         pa_cvolume *volume,
1091         pa_stream *sync_stream) {
1092
1093     pa_assert(s);
1094     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1095
1096     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1097 }
1098
1099 int pa_stream_connect_record(
1100         pa_stream *s,
1101         const char *dev,
1102         const pa_buffer_attr *attr,
1103         pa_stream_flags_t flags) {
1104
1105     pa_assert(s);
1106     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1107
1108     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1109 }
1110
1111 int pa_stream_write(
1112         pa_stream *s,
1113         const void *data,
1114         size_t length,
1115         void (*free_cb)(void *p),
1116         int64_t offset,
1117         pa_seek_mode_t seek) {
1118
1119     pa_memchunk chunk;
1120     pa_seek_mode_t t_seek;
1121     int64_t t_offset;
1122     size_t t_length;
1123     const void *t_data;
1124
1125     pa_assert(s);
1126     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1127     pa_assert(data);
1128
1129     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1130     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1131     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1132     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1133     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1134
1135     if (length <= 0)
1136         return 0;
1137
1138     t_seek = seek;
1139     t_offset = offset;
1140     t_length = length;
1141     t_data = data;
1142
1143     while (t_length > 0) {
1144
1145         chunk.index = 0;
1146
1147         if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1148             chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1149             chunk.length = t_length;
1150         } else {
1151             void *d;
1152
1153             chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1154             chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1155
1156             d = pa_memblock_acquire(chunk.memblock);
1157             memcpy(d, t_data, chunk.length);
1158             pa_memblock_release(chunk.memblock);
1159         }
1160
1161         pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1162
1163         t_offset = 0;
1164         t_seek = PA_SEEK_RELATIVE;
1165
1166         t_data = (const uint8_t*) t_data + chunk.length;
1167         t_length -= chunk.length;
1168
1169         pa_memblock_unref(chunk.memblock);
1170     }
1171
1172     if (free_cb && pa_pstream_get_shm(s->context->pstream))
1173         free_cb((void*) data);
1174
1175     if (length < s->requested_bytes)
1176         s->requested_bytes -= (uint32_t) length;
1177     else
1178         s->requested_bytes = 0;
1179
1180     /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1181
1182     if (s->direction == PA_STREAM_PLAYBACK) {
1183
1184         /* Update latency request correction */
1185         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1186
1187             if (seek == PA_SEEK_ABSOLUTE) {
1188                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1189                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1190                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1191             } else if (seek == PA_SEEK_RELATIVE) {
1192                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1193                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1194             } else
1195                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1196         }
1197
1198         /* Update the write index in the already available latency data */
1199         if (s->timing_info_valid) {
1200
1201             if (seek == PA_SEEK_ABSOLUTE) {
1202                 s->timing_info.write_index_corrupt = FALSE;
1203                 s->timing_info.write_index = offset + (int64_t) length;
1204             } else if (seek == PA_SEEK_RELATIVE) {
1205                 if (!s->timing_info.write_index_corrupt)
1206                     s->timing_info.write_index += offset + (int64_t) length;
1207             } else
1208                 s->timing_info.write_index_corrupt = TRUE;
1209         }
1210
1211         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1212             request_auto_timing_update(s, TRUE);
1213     }
1214
1215     return 0;
1216 }
1217
1218 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1219     pa_assert(s);
1220     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1221     pa_assert(data);
1222     pa_assert(length);
1223
1224     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1225     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1226     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1227
1228     if (!s->peek_memchunk.memblock) {
1229
1230         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1231             *data = NULL;
1232             *length = 0;
1233             return 0;
1234         }
1235
1236         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1237     }
1238
1239     pa_assert(s->peek_data);
1240     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1241     *length = s->peek_memchunk.length;
1242     return 0;
1243 }
1244
1245 int pa_stream_drop(pa_stream *s) {
1246     pa_assert(s);
1247     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1248
1249     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1250     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1251     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1252     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1253
1254     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1255
1256     /* Fix the simulated local read index */
1257     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1258         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1259
1260     pa_assert(s->peek_data);
1261     pa_memblock_release(s->peek_memchunk.memblock);
1262     pa_memblock_unref(s->peek_memchunk.memblock);
1263     pa_memchunk_reset(&s->peek_memchunk);
1264
1265     return 0;
1266 }
1267
1268 size_t pa_stream_writable_size(pa_stream *s) {
1269     pa_assert(s);
1270     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1271
1272     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1273     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1274     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1275
1276     return s->requested_bytes;
1277 }
1278
1279 size_t pa_stream_readable_size(pa_stream *s) {
1280     pa_assert(s);
1281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1282
1283     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1285     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1286
1287     return pa_memblockq_get_length(s->record_memblockq);
1288 }
1289
1290 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1291     pa_operation *o;
1292     pa_tagstruct *t;
1293     uint32_t tag;
1294
1295     pa_assert(s);
1296     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1297
1298     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1299     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1300     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1301
1302     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1303
1304     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1305     pa_tagstruct_putu32(t, s->channel);
1306     pa_pstream_send_tagstruct(s->context->pstream, t);
1307     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1308
1309     return o;
1310 }
1311
1312 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1313     pa_usec_t usec;
1314
1315     pa_assert(s);
1316     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1317     pa_assert(s->state == PA_STREAM_READY);
1318     pa_assert(s->direction != PA_STREAM_UPLOAD);
1319     pa_assert(s->timing_info_valid);
1320     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1321     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1322
1323     if (s->direction == PA_STREAM_PLAYBACK) {
1324         /* The last byte that was written into the output device
1325          * had this time value associated */
1326         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1327
1328         if (!s->corked && !s->suspended) {
1329
1330             if (!ignore_transport)
1331                 /* Because the latency info took a little time to come
1332                  * to us, we assume that the real output time is actually
1333                  * a little ahead */
1334                 usec += s->timing_info.transport_usec;
1335
1336             /* However, the output device usually maintains a buffer
1337                too, hence the real sample currently played is a little
1338                back  */
1339             if (s->timing_info.sink_usec >= usec)
1340                 usec = 0;
1341             else
1342                 usec -= s->timing_info.sink_usec;
1343         }
1344
1345     } else {
1346         pa_assert(s->direction == PA_STREAM_RECORD);
1347
1348         /* The last byte written into the server side queue had
1349          * this time value associated */
1350         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1351
1352         if (!s->corked && !s->suspended) {
1353
1354             if (!ignore_transport)
1355                 /* Add transport latency */
1356                 usec += s->timing_info.transport_usec;
1357
1358             /* Add latency of data in device buffer */
1359             usec += s->timing_info.source_usec;
1360
1361             /* If this is a monitor source, we need to correct the
1362              * time by the playback device buffer */
1363             if (s->timing_info.sink_usec >= usec)
1364                 usec = 0;
1365             else
1366                 usec -= s->timing_info.sink_usec;
1367         }
1368     }
1369
1370     return usec;
1371 }
1372
1373 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1374     pa_operation *o = userdata;
1375     struct timeval local, remote, now;
1376     pa_timing_info *i;
1377     pa_bool_t playing = FALSE;
1378     uint64_t underrun_for = 0, playing_for = 0;
1379
1380     pa_assert(pd);
1381     pa_assert(o);
1382     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1383
1384     if (!o->context || !o->stream)
1385         goto finish;
1386
1387     i = &o->stream->timing_info;
1388
1389     o->stream->timing_info_valid = FALSE;
1390     i->write_index_corrupt = TRUE;
1391     i->read_index_corrupt = TRUE;
1392
1393     if (command != PA_COMMAND_REPLY) {
1394         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1395             goto finish;
1396
1397     } else {
1398
1399         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1400             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1401             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1402             pa_tagstruct_get_timeval(t, &local) < 0 ||
1403             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1404             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1405             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1406
1407             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1408             goto finish;
1409         }
1410
1411         if (o->context->version >= 13 &&
1412             o->stream->direction == PA_STREAM_PLAYBACK)
1413             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1414                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1415
1416                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1417                 goto finish;
1418             }
1419
1420
1421         if (!pa_tagstruct_eof(t)) {
1422             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1423             goto finish;
1424         }
1425         o->stream->timing_info_valid = TRUE;
1426         i->write_index_corrupt = FALSE;
1427         i->read_index_corrupt = FALSE;
1428
1429         i->playing = (int) playing;
1430         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1431
1432         pa_gettimeofday(&now);
1433
1434         /* Calculcate timestamps */
1435         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1436             /* local and remote seem to have synchronized clocks */
1437
1438             if (o->stream->direction == PA_STREAM_PLAYBACK)
1439                 i->transport_usec = pa_timeval_diff(&remote, &local);
1440             else
1441                 i->transport_usec = pa_timeval_diff(&now, &remote);
1442
1443             i->synchronized_clocks = TRUE;
1444             i->timestamp = remote;
1445         } else {
1446             /* clocks are not synchronized, let's estimate latency then */
1447             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1448             i->synchronized_clocks = FALSE;
1449             i->timestamp = local;
1450             pa_timeval_add(&i->timestamp, i->transport_usec);
1451         }
1452
1453         /* Invalidate read and write indexes if necessary */
1454         if (tag < o->stream->read_index_not_before)
1455             i->read_index_corrupt = TRUE;
1456
1457         if (tag < o->stream->write_index_not_before)
1458             i->write_index_corrupt = TRUE;
1459
1460         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1461             /* Write index correction */
1462
1463             int n, j;
1464             uint32_t ctag = tag;
1465
1466             /* Go through the saved correction values and add up the
1467              * total correction.*/
1468             for (n = 0, j = o->stream->current_write_index_correction+1;
1469                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1470                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1471
1472                 /* Step over invalid data or out-of-date data */
1473                 if (!o->stream->write_index_corrections[j].valid ||
1474                     o->stream->write_index_corrections[j].tag < ctag)
1475                     continue;
1476
1477                 /* Make sure that everything is in order */
1478                 ctag = o->stream->write_index_corrections[j].tag+1;
1479
1480                 /* Now fix the write index */
1481                 if (o->stream->write_index_corrections[j].corrupt) {
1482                     /* A corrupting seek was made */
1483                     i->write_index_corrupt = TRUE;
1484                 } else if (o->stream->write_index_corrections[j].absolute) {
1485                     /* An absolute seek was made */
1486                     i->write_index = o->stream->write_index_corrections[j].value;
1487                     i->write_index_corrupt = FALSE;
1488                 } else if (!i->write_index_corrupt) {
1489                     /* A relative seek was made */
1490                     i->write_index += o->stream->write_index_corrections[j].value;
1491                 }
1492             }
1493
1494             /* Clear old correction entries */
1495             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1496                 if (!o->stream->write_index_corrections[n].valid)
1497                     continue;
1498
1499                 if (o->stream->write_index_corrections[n].tag <= tag)
1500                     o->stream->write_index_corrections[n].valid = FALSE;
1501             }
1502         }
1503
1504         if (o->stream->direction == PA_STREAM_RECORD) {
1505             /* Read index correction */
1506
1507             if (!i->read_index_corrupt)
1508                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1509         }
1510
1511         /* Update smoother */
1512         if (o->stream->smoother) {
1513             pa_usec_t u, x;
1514
1515             u = x = pa_rtclock_usec() - i->transport_usec;
1516
1517             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1518                 pa_usec_t su;
1519
1520                 /* If we weren't playing then it will take some time
1521                  * until the audio will actually come out through the
1522                  * speakers. Since we follow that timing here, we need
1523                  * to try to fix this up */
1524
1525                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1526
1527                 if (su < i->sink_usec)
1528                     x += i->sink_usec - su;
1529             }
1530
1531             if (!i->playing)
1532                 pa_smoother_pause(o->stream->smoother, x);
1533
1534             /* Update the smoother */
1535             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1536                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1537                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1538
1539             if (i->playing)
1540                 pa_smoother_resume(o->stream->smoother, x);
1541         }
1542     }
1543
1544     o->stream->auto_timing_update_requested = FALSE;
1545
1546     if (o->stream->latency_update_callback)
1547         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1548
1549     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1550         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1551         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1552     }
1553
1554 finish:
1555
1556     pa_operation_done(o);
1557     pa_operation_unref(o);
1558 }
1559
1560 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1561     uint32_t tag;
1562     pa_operation *o;
1563     pa_tagstruct *t;
1564     struct timeval now;
1565     int cidx = 0;
1566
1567     pa_assert(s);
1568     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1569
1570     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1571     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1572     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1573
1574     if (s->direction == PA_STREAM_PLAYBACK) {
1575         /* Find a place to store the write_index correction data for this entry */
1576         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1577
1578         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1579         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1580     }
1581     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1582
1583     t = pa_tagstruct_command(
1584             s->context,
1585             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1586             &tag);
1587     pa_tagstruct_putu32(t, s->channel);
1588     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1589
1590     pa_pstream_send_tagstruct(s->context->pstream, t);
1591     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1592
1593     if (s->direction == PA_STREAM_PLAYBACK) {
1594         /* Fill in initial correction data */
1595
1596         s->current_write_index_correction = cidx;
1597
1598         s->write_index_corrections[cidx].valid = TRUE;
1599         s->write_index_corrections[cidx].absolute = FALSE;
1600         s->write_index_corrections[cidx].corrupt = FALSE;
1601         s->write_index_corrections[cidx].tag = tag;
1602         s->write_index_corrections[cidx].value = 0;
1603     }
1604
1605     return o;
1606 }
1607
1608 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1609     pa_stream *s = userdata;
1610
1611     pa_assert(pd);
1612     pa_assert(s);
1613     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1614
1615     pa_stream_ref(s);
1616
1617     if (command != PA_COMMAND_REPLY) {
1618         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1619             goto finish;
1620
1621         pa_stream_set_state(s, PA_STREAM_FAILED);
1622         goto finish;
1623     } else if (!pa_tagstruct_eof(t)) {
1624         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1625         goto finish;
1626     }
1627
1628     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1629
1630 finish:
1631     pa_stream_unref(s);
1632 }
1633
1634 int pa_stream_disconnect(pa_stream *s) {
1635     pa_tagstruct *t;
1636     uint32_t tag;
1637
1638     pa_assert(s);
1639     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1640
1641     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1642     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1643     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1644
1645     pa_stream_ref(s);
1646
1647     t = pa_tagstruct_command(
1648             s->context,
1649             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1650                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1651             &tag);
1652     pa_tagstruct_putu32(t, s->channel);
1653     pa_pstream_send_tagstruct(s->context->pstream, t);
1654     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1655
1656     pa_stream_unref(s);
1657     return 0;
1658 }
1659
1660 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1661     pa_assert(s);
1662     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1663
1664     if (pa_detect_fork())
1665         return;
1666
1667     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1668         return;
1669
1670     s->read_callback = cb;
1671     s->read_userdata = userdata;
1672 }
1673
1674 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1675     pa_assert(s);
1676     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1677
1678     if (pa_detect_fork())
1679         return;
1680
1681     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1682         return;
1683
1684     s->write_callback = cb;
1685     s->write_userdata = userdata;
1686 }
1687
1688 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1689     pa_assert(s);
1690     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1691
1692     if (pa_detect_fork())
1693         return;
1694
1695     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1696         return;
1697
1698     s->state_callback = cb;
1699     s->state_userdata = userdata;
1700 }
1701
1702 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1703     pa_assert(s);
1704     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1705
1706     if (pa_detect_fork())
1707         return;
1708
1709     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1710         return;
1711
1712     s->overflow_callback = cb;
1713     s->overflow_userdata = userdata;
1714 }
1715
1716 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1717     pa_assert(s);
1718     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1719
1720     if (pa_detect_fork())
1721         return;
1722
1723     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1724         return;
1725
1726     s->underflow_callback = cb;
1727     s->underflow_userdata = userdata;
1728 }
1729
1730 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1731     pa_assert(s);
1732     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1733
1734     if (pa_detect_fork())
1735         return;
1736
1737     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1738         return;
1739
1740     s->latency_update_callback = cb;
1741     s->latency_update_userdata = userdata;
1742 }
1743
1744 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1745     pa_assert(s);
1746     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1747
1748     if (pa_detect_fork())
1749         return;
1750
1751     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1752         return;
1753
1754     s->moved_callback = cb;
1755     s->moved_userdata = userdata;
1756 }
1757
1758 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1759     pa_assert(s);
1760     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1761
1762     if (pa_detect_fork())
1763         return;
1764
1765     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1766         return;
1767
1768     s->suspended_callback = cb;
1769     s->suspended_userdata = userdata;
1770 }
1771
1772 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1773     pa_assert(s);
1774     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1775
1776     if (pa_detect_fork())
1777         return;
1778
1779     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1780         return;
1781
1782     s->started_callback = cb;
1783     s->started_userdata = userdata;
1784 }
1785
1786 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1787     pa_assert(s);
1788     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1789
1790     if (pa_detect_fork())
1791         return;
1792
1793     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1794         return;
1795
1796     s->event_callback = cb;
1797     s->event_userdata = userdata;
1798 }
1799
1800 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1801     pa_operation *o = userdata;
1802     int success = 1;
1803
1804     pa_assert(pd);
1805     pa_assert(o);
1806     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1807
1808     if (!o->context)
1809         goto finish;
1810
1811     if (command != PA_COMMAND_REPLY) {
1812         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1813             goto finish;
1814
1815         success = 0;
1816     } else if (!pa_tagstruct_eof(t)) {
1817         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1818         goto finish;
1819     }
1820
1821     if (o->callback) {
1822         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1823         cb(o->stream, success, o->userdata);
1824     }
1825
1826 finish:
1827     pa_operation_done(o);
1828     pa_operation_unref(o);
1829 }
1830
1831 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1832     pa_operation *o;
1833     pa_tagstruct *t;
1834     uint32_t tag;
1835
1836     pa_assert(s);
1837     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1838
1839     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1840     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1841     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1842
1843     s->corked = b;
1844
1845     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1846
1847     t = pa_tagstruct_command(
1848             s->context,
1849             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1850             &tag);
1851     pa_tagstruct_putu32(t, s->channel);
1852     pa_tagstruct_put_boolean(t, !!b);
1853     pa_pstream_send_tagstruct(s->context->pstream, t);
1854     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1855
1856     check_smoother_status(s, FALSE, FALSE, FALSE);
1857
1858     /* This might cause the indexes to hang/start again, hence
1859      * let's request a timing update */
1860     request_auto_timing_update(s, TRUE);
1861
1862     return o;
1863 }
1864
1865 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1866     pa_tagstruct *t;
1867     pa_operation *o;
1868     uint32_t tag;
1869
1870     pa_assert(s);
1871     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1872
1873     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1874     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1875
1876     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1877
1878     t = pa_tagstruct_command(s->context, command, &tag);
1879     pa_tagstruct_putu32(t, s->channel);
1880     pa_pstream_send_tagstruct(s->context->pstream, t);
1881     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1882
1883     return o;
1884 }
1885
1886 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1887     pa_operation *o;
1888
1889     pa_assert(s);
1890     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1891
1892     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1893     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1894     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1895
1896     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1897         return NULL;
1898
1899     if (s->direction == PA_STREAM_PLAYBACK) {
1900
1901         if (s->write_index_corrections[s->current_write_index_correction].valid)
1902             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1903
1904         if (s->buffer_attr.prebuf > 0)
1905             check_smoother_status(s, FALSE, FALSE, TRUE);
1906
1907         /* This will change the write index, but leave the
1908          * read index untouched. */
1909         invalidate_indexes(s, FALSE, TRUE);
1910
1911     } else
1912         /* For record streams this has no influence on the write
1913          * index, but the read index might jump. */
1914         invalidate_indexes(s, TRUE, FALSE);
1915
1916     return o;
1917 }
1918
1919 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1920     pa_operation *o;
1921
1922     pa_assert(s);
1923     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1924
1925     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1926     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1927     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1928     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1929
1930     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1931         return NULL;
1932
1933     /* This might cause the read index to hang again, hence
1934      * let's request a timing update */
1935     request_auto_timing_update(s, TRUE);
1936
1937     return o;
1938 }
1939
1940 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1941     pa_operation *o;
1942
1943     pa_assert(s);
1944     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1945
1946     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1947     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1948     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1949     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1950
1951     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1952         return NULL;
1953
1954     /* This might cause the read index to start moving again, hence
1955      * let's request a timing update */
1956     request_auto_timing_update(s, TRUE);
1957
1958     return o;
1959 }
1960
1961 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1962     pa_operation *o;
1963
1964     pa_assert(s);
1965     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1966     pa_assert(name);
1967
1968     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1969     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1970     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1971
1972     if (s->context->version >= 13) {
1973         pa_proplist *p = pa_proplist_new();
1974
1975         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1976         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
1977         pa_proplist_free(p);
1978     } else {
1979         pa_tagstruct *t;
1980         uint32_t tag;
1981
1982         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1983         t = pa_tagstruct_command(
1984                 s->context,
1985                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
1986                 &tag);
1987         pa_tagstruct_putu32(t, s->channel);
1988         pa_tagstruct_puts(t, name);
1989         pa_pstream_send_tagstruct(s->context->pstream, t);
1990         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1991     }
1992
1993     return o;
1994 }
1995
1996 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
1997     pa_usec_t usec;
1998
1999     pa_assert(s);
2000     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2001
2002     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2003     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2004     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2005     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2006     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2007     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2008
2009     if (s->smoother)
2010         usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2011     else
2012         usec = calc_time(s, FALSE);
2013
2014     /* Make sure the time runs monotonically */
2015     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2016         if (usec < s->previous_time)
2017             usec = s->previous_time;
2018         else
2019             s->previous_time = usec;
2020     }
2021
2022     if (r_usec)
2023         *r_usec = usec;
2024
2025     return 0;
2026 }
2027
2028 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2029     pa_assert(s);
2030     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2031
2032     if (negative)
2033         *negative = 0;
2034
2035     if (a >= b)
2036         return a-b;
2037     else {
2038         if (negative && s->direction == PA_STREAM_RECORD) {
2039             *negative = 1;
2040             return b-a;
2041         } else
2042             return 0;
2043     }
2044 }
2045
2046 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2047     pa_usec_t t, c;
2048     int r;
2049     int64_t cindex;
2050
2051     pa_assert(s);
2052     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2053     pa_assert(r_usec);
2054
2055     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2056     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2057     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2058     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2059     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2060     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2061
2062     if ((r = pa_stream_get_time(s, &t)) < 0)
2063         return r;
2064
2065     if (s->direction == PA_STREAM_PLAYBACK)
2066         cindex = s->timing_info.write_index;
2067     else
2068         cindex = s->timing_info.read_index;
2069
2070     if (cindex < 0)
2071         cindex = 0;
2072
2073     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2074
2075     if (s->direction == PA_STREAM_PLAYBACK)
2076         *r_usec = time_counter_diff(s, c, t, negative);
2077     else
2078         *r_usec = time_counter_diff(s, t, c, negative);
2079
2080     return 0;
2081 }
2082
2083 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2084     pa_assert(s);
2085     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2086
2087     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2088     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2089     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2090     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2091
2092     return &s->timing_info;
2093 }
2094
2095 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2096     pa_assert(s);
2097     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2098
2099     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2100
2101     return &s->sample_spec;
2102 }
2103
2104 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2105     pa_assert(s);
2106     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2107
2108     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2109
2110     return &s->channel_map;
2111 }
2112
2113 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2114     pa_assert(s);
2115     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2116
2117     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2118     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2119     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2120
2121     return &s->buffer_attr;
2122 }
2123
2124 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2125     pa_operation *o = userdata;
2126     int success = 1;
2127
2128     pa_assert(pd);
2129     pa_assert(o);
2130     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2131
2132     if (!o->context)
2133         goto finish;
2134
2135     if (command != PA_COMMAND_REPLY) {
2136         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2137             goto finish;
2138
2139         success = 0;
2140     } else {
2141         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2142             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2143                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2144                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2145                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2146                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2147                 goto finish;
2148             }
2149         } else if (o->stream->direction == PA_STREAM_RECORD) {
2150             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2151                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2152                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2153                 goto finish;
2154             }
2155         }
2156
2157         if (o->stream->context->version >= 13) {
2158             pa_usec_t usec;
2159
2160             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2161                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2162                 goto finish;
2163             }
2164
2165             if (o->stream->direction == PA_STREAM_RECORD)
2166                 o->stream->timing_info.configured_source_usec = usec;
2167             else
2168                 o->stream->timing_info.configured_sink_usec = usec;
2169         }
2170
2171         if (!pa_tagstruct_eof(t)) {
2172             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2173             goto finish;
2174         }
2175     }
2176
2177     if (o->callback) {
2178         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2179         cb(o->stream, success, o->userdata);
2180     }
2181
2182 finish:
2183     pa_operation_done(o);
2184     pa_operation_unref(o);
2185 }
2186
2187
2188 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2189     pa_operation *o;
2190     pa_tagstruct *t;
2191     uint32_t tag;
2192
2193     pa_assert(s);
2194     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2195     pa_assert(attr);
2196
2197     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2198     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2199     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2200     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2201
2202     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2203
2204     t = pa_tagstruct_command(
2205             s->context,
2206             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2207             &tag);
2208     pa_tagstruct_putu32(t, s->channel);
2209
2210     pa_tagstruct_putu32(t, attr->maxlength);
2211
2212     if (s->direction == PA_STREAM_PLAYBACK)
2213         pa_tagstruct_put(
2214                 t,
2215                 PA_TAG_U32, attr->tlength,
2216                 PA_TAG_U32, attr->prebuf,
2217                 PA_TAG_U32, attr->minreq,
2218                 PA_TAG_INVALID);
2219     else
2220         pa_tagstruct_putu32(t, attr->fragsize);
2221
2222     if (s->context->version >= 13)
2223         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2224
2225     if (s->context->version >= 14)
2226         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2227
2228     pa_pstream_send_tagstruct(s->context->pstream, t);
2229     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2230
2231     /* This might cause changes in the read/write indexex, hence let's
2232      * request a timing update */
2233     request_auto_timing_update(s, TRUE);
2234
2235     return o;
2236 }
2237
2238 uint32_t pa_stream_get_device_index(pa_stream *s) {
2239     pa_assert(s);
2240     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2241
2242     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2243     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2244     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2245     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2246     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2247
2248     return s->device_index;
2249 }
2250
2251 const char *pa_stream_get_device_name(pa_stream *s) {
2252     pa_assert(s);
2253     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2254
2255     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2256     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2257     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2258     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2259     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2260
2261     return s->device_name;
2262 }
2263
2264 int pa_stream_is_suspended(pa_stream *s) {
2265     pa_assert(s);
2266     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2267
2268     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2269     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2270     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2271     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2272
2273     return s->suspended;
2274 }
2275
2276 int pa_stream_is_corked(pa_stream *s) {
2277     pa_assert(s);
2278     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279
2280     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2281     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2282     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2283
2284     return s->corked;
2285 }
2286
2287 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2288     pa_operation *o = userdata;
2289     int success = 1;
2290
2291     pa_assert(pd);
2292     pa_assert(o);
2293     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2294
2295     if (!o->context)
2296         goto finish;
2297
2298     if (command != PA_COMMAND_REPLY) {
2299         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2300             goto finish;
2301
2302         success = 0;
2303     } else {
2304
2305         if (!pa_tagstruct_eof(t)) {
2306             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2307             goto finish;
2308         }
2309     }
2310
2311     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2312     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2313
2314     if (o->callback) {
2315         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2316         cb(o->stream, success, o->userdata);
2317     }
2318
2319 finish:
2320     pa_operation_done(o);
2321     pa_operation_unref(o);
2322 }
2323
2324
2325 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2326     pa_operation *o;
2327     pa_tagstruct *t;
2328     uint32_t tag;
2329
2330     pa_assert(s);
2331     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2332
2333     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2334     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2335     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2336     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2337     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2338     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2339
2340     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2341     o->private = PA_UINT_TO_PTR(rate);
2342
2343     t = pa_tagstruct_command(
2344             s->context,
2345             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2346             &tag);
2347     pa_tagstruct_putu32(t, s->channel);
2348     pa_tagstruct_putu32(t, rate);
2349
2350     pa_pstream_send_tagstruct(s->context->pstream, t);
2351     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2352
2353     return o;
2354 }
2355
2356 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2357     pa_operation *o;
2358     pa_tagstruct *t;
2359     uint32_t tag;
2360
2361     pa_assert(s);
2362     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2363
2364     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2365     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2366     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2367     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2368     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2369
2370     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2371
2372     t = pa_tagstruct_command(
2373             s->context,
2374             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2375             &tag);
2376     pa_tagstruct_putu32(t, s->channel);
2377     pa_tagstruct_putu32(t, (uint32_t) mode);
2378     pa_tagstruct_put_proplist(t, p);
2379
2380     pa_pstream_send_tagstruct(s->context->pstream, t);
2381     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2382
2383     /* Please note that we don't update s->proplist here, because we
2384      * don't export that field */
2385
2386     return o;
2387 }
2388
2389 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2390     pa_operation *o;
2391     pa_tagstruct *t;
2392     uint32_t tag;
2393     const char * const*k;
2394
2395     pa_assert(s);
2396     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2397
2398     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2399     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2400     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2401     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2402     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2403
2404     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2405
2406     t = pa_tagstruct_command(
2407             s->context,
2408             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2409             &tag);
2410     pa_tagstruct_putu32(t, s->channel);
2411
2412     for (k = keys; *k; k++)
2413         pa_tagstruct_puts(t, *k);
2414
2415     pa_tagstruct_puts(t, NULL);
2416
2417     pa_pstream_send_tagstruct(s->context->pstream, t);
2418     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2419
2420     /* Please note that we don't update s->proplist here, because we
2421      * don't export that field */
2422
2423     return o;
2424 }
2425
2426 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2427     pa_assert(s);
2428     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2429
2430     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2431     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2432     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2433     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2434
2435     s->direct_on_input = sink_input_idx;
2436
2437     return 0;
2438 }
2439
2440 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2441     pa_assert(s);
2442     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2443
2444     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2445     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2446     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2447
2448     return s->direct_on_input;
2449 }