client: make volume struct const
[profile/ivi/pulseaudio.git] / src / pulse / stream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57     s->read_callback = NULL;
58     s->read_userdata = NULL;
59     s->write_callback = NULL;
60     s->write_userdata = NULL;
61     s->state_callback = NULL;
62     s->state_userdata = NULL;
63     s->overflow_callback = NULL;
64     s->overflow_userdata = NULL;
65     s->underflow_callback = NULL;
66     s->underflow_userdata = NULL;
67     s->latency_update_callback = NULL;
68     s->latency_update_userdata = NULL;
69     s->moved_callback = NULL;
70     s->moved_userdata = NULL;
71     s->suspended_callback = NULL;
72     s->suspended_userdata = NULL;
73     s->started_callback = NULL;
74     s->started_userdata = NULL;
75     s->event_callback = NULL;
76     s->event_userdata = NULL;
77     s->buffer_attr_callback = NULL;
78     s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82         pa_context *c,
83         const char *name,
84         const pa_sample_spec *ss,
85         const pa_channel_map *map,
86         pa_proplist *p) {
87
88     pa_stream *s;
89     int i;
90     pa_channel_map tmap;
91
92     pa_assert(c);
93     pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103     if (!map)
104         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106     s = pa_xnew(pa_stream, 1);
107     PA_REFCNT_INIT(s);
108     s->context = c;
109     s->mainloop = c->mainloop;
110
111     s->direction = PA_STREAM_NODIRECTION;
112     s->state = PA_STREAM_UNCONNECTED;
113     s->flags = 0;
114
115     s->sample_spec = *ss;
116     s->channel_map = *map;
117
118     s->direct_on_input = PA_INVALID_INDEX;
119
120     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121     if (name)
122         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124     s->channel = 0;
125     s->channel_valid = FALSE;
126     s->syncid = c->csyncid++;
127     s->stream_index = PA_INVALID_INDEX;
128
129     s->requested_bytes = 0;
130     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132     /* We initialize der target length here, so that if the user
133      * passes no explicit buffering metrics the default is similar to
134      * what older PA versions provided. */
135
136     s->buffer_attr.maxlength = (uint32_t) -1;
137     s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138     s->buffer_attr.minreq = (uint32_t) -1;
139     s->buffer_attr.prebuf = (uint32_t) -1;
140     s->buffer_attr.fragsize = (uint32_t) -1;
141
142     s->device_index = PA_INVALID_INDEX;
143     s->device_name = NULL;
144     s->suspended = FALSE;
145     s->corked = FALSE;
146
147     s->write_memblock = NULL;
148     s->write_data = NULL;
149
150     pa_memchunk_reset(&s->peek_memchunk);
151     s->peek_data = NULL;
152     s->record_memblockq = NULL;
153
154     memset(&s->timing_info, 0, sizeof(s->timing_info));
155     s->timing_info_valid = FALSE;
156
157     s->previous_time = 0;
158
159     s->read_index_not_before = 0;
160     s->write_index_not_before = 0;
161     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162         s->write_index_corrections[i].valid = 0;
163     s->current_write_index_correction = 0;
164
165     s->auto_timing_update_event = NULL;
166     s->auto_timing_update_requested = FALSE;
167     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
168
169     reset_callbacks(s);
170
171     s->smoother = NULL;
172
173     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174     PA_LLIST_PREPEND(pa_stream, c->streams, s);
175     pa_stream_ref(s);
176
177     return s;
178 }
179
180 static void stream_unlink(pa_stream *s) {
181     pa_operation *o, *n;
182     pa_assert(s);
183
184     if (!s->context)
185         return;
186
187     /* Detach from context */
188
189     /* Unref all operatio object that point to us */
190     for (o = s->context->operations; o; o = n) {
191         n = o->next;
192
193         if (o->stream == s)
194             pa_operation_cancel(o);
195     }
196
197     /* Drop all outstanding replies for this stream */
198     if (s->context->pdispatch)
199         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200
201     if (s->channel_valid) {
202         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203         s->channel = 0;
204         s->channel_valid = FALSE;
205     }
206
207     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208     pa_stream_unref(s);
209
210     s->context = NULL;
211
212     if (s->auto_timing_update_event) {
213         pa_assert(s->mainloop);
214         s->mainloop->time_free(s->auto_timing_update_event);
215     }
216
217     reset_callbacks(s);
218 }
219
220 static void stream_free(pa_stream *s) {
221     pa_assert(s);
222
223     stream_unlink(s);
224
225     if (s->write_memblock) {
226         pa_memblock_release(s->write_memblock);
227         pa_memblock_unref(s->write_data);
228     }
229
230     if (s->peek_memchunk.memblock) {
231         if (s->peek_data)
232             pa_memblock_release(s->peek_memchunk.memblock);
233         pa_memblock_unref(s->peek_memchunk.memblock);
234     }
235
236     if (s->record_memblockq)
237         pa_memblockq_free(s->record_memblockq);
238
239     if (s->proplist)
240         pa_proplist_free(s->proplist);
241
242     if (s->smoother)
243         pa_smoother_free(s->smoother);
244
245     pa_xfree(s->device_name);
246     pa_xfree(s);
247 }
248
249 void pa_stream_unref(pa_stream *s) {
250     pa_assert(s);
251     pa_assert(PA_REFCNT_VALUE(s) >= 1);
252
253     if (PA_REFCNT_DEC(s) <= 0)
254         stream_free(s);
255 }
256
257 pa_stream* pa_stream_ref(pa_stream *s) {
258     pa_assert(s);
259     pa_assert(PA_REFCNT_VALUE(s) >= 1);
260
261     PA_REFCNT_INC(s);
262     return s;
263 }
264
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266     pa_assert(s);
267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269     return s->state;
270 }
271
272 pa_context* pa_stream_get_context(pa_stream *s) {
273     pa_assert(s);
274     pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276     return s->context;
277 }
278
279 uint32_t pa_stream_get_index(pa_stream *s) {
280     pa_assert(s);
281     pa_assert(PA_REFCNT_VALUE(s) >= 1);
282
283     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
285
286     return s->stream_index;
287 }
288
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290     pa_assert(s);
291     pa_assert(PA_REFCNT_VALUE(s) >= 1);
292
293     if (s->state == st)
294         return;
295
296     pa_stream_ref(s);
297
298     s->state = st;
299
300     if (s->state_callback)
301         s->state_callback(s, s->state_userdata);
302
303     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304         stream_unlink(s);
305
306     pa_stream_unref(s);
307 }
308
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310     pa_assert(s);
311     pa_assert(PA_REFCNT_VALUE(s) >= 1);
312
313     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314         return;
315
316     if (s->state == PA_STREAM_READY &&
317         (force || !s->auto_timing_update_requested)) {
318         pa_operation *o;
319
320 /*         pa_log("Automatically requesting new timing data"); */
321
322         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323             pa_operation_unref(o);
324             s->auto_timing_update_requested = TRUE;
325         }
326     }
327
328     if (s->auto_timing_update_event) {
329         if (force)
330             s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
331
332         pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
333
334         s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335     }
336 }
337
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339     pa_context *c = userdata;
340     pa_stream *s;
341     uint32_t channel;
342
343     pa_assert(pd);
344     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345     pa_assert(t);
346     pa_assert(c);
347     pa_assert(PA_REFCNT_VALUE(c) >= 1);
348
349     pa_context_ref(c);
350
351     if (pa_tagstruct_getu32(t, &channel) < 0 ||
352         !pa_tagstruct_eof(t)) {
353         pa_context_fail(c, PA_ERR_PROTOCOL);
354         goto finish;
355     }
356
357     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358         goto finish;
359
360     if (s->state != PA_STREAM_READY)
361         goto finish;
362
363     pa_context_set_error(c, PA_ERR_KILLED);
364     pa_stream_set_state(s, PA_STREAM_FAILED);
365
366 finish:
367     pa_context_unref(c);
368 }
369
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371     pa_usec_t x;
372
373     pa_assert(s);
374     pa_assert(!force_start || !force_stop);
375
376     if (!s->smoother)
377         return;
378
379     x = pa_rtclock_now();
380
381     if (s->timing_info_valid) {
382         if (aposteriori)
383             x -= s->timing_info.transport_usec;
384         else
385             x += s->timing_info.transport_usec;
386     }
387
388     if (s->suspended || s->corked || force_stop)
389         pa_smoother_pause(s->smoother, x);
390     else if (force_start || s->buffer_attr.prebuf == 0)
391         pa_smoother_resume(s->smoother, x, TRUE);
392
393
394     /* Please note that we have no idea if playback actually started
395      * if prebuf is non-zero! */
396 }
397
398 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
399     pa_context *c = userdata;
400     pa_stream *s;
401     uint32_t channel;
402     const char *dn;
403     pa_bool_t suspended;
404     uint32_t di;
405     pa_usec_t usec = 0;
406     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
407
408     pa_assert(pd);
409     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
410     pa_assert(t);
411     pa_assert(c);
412     pa_assert(PA_REFCNT_VALUE(c) >= 1);
413
414     pa_context_ref(c);
415
416     if (c->version < 12) {
417         pa_context_fail(c, PA_ERR_PROTOCOL);
418         goto finish;
419     }
420
421     if (pa_tagstruct_getu32(t, &channel) < 0 ||
422         pa_tagstruct_getu32(t, &di) < 0 ||
423         pa_tagstruct_gets(t, &dn) < 0 ||
424         pa_tagstruct_get_boolean(t, &suspended) < 0) {
425         pa_context_fail(c, PA_ERR_PROTOCOL);
426         goto finish;
427     }
428
429     if (c->version >= 13) {
430
431         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
432             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
433                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
434                 pa_tagstruct_get_usec(t, &usec) < 0) {
435                 pa_context_fail(c, PA_ERR_PROTOCOL);
436                 goto finish;
437             }
438         } else {
439             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
440                 pa_tagstruct_getu32(t, &tlength) < 0 ||
441                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
442                 pa_tagstruct_getu32(t, &minreq) < 0 ||
443                 pa_tagstruct_get_usec(t, &usec) < 0) {
444                 pa_context_fail(c, PA_ERR_PROTOCOL);
445                 goto finish;
446             }
447         }
448     }
449
450     if (!pa_tagstruct_eof(t)) {
451         pa_context_fail(c, PA_ERR_PROTOCOL);
452         goto finish;
453     }
454
455     if (!dn || di == PA_INVALID_INDEX) {
456         pa_context_fail(c, PA_ERR_PROTOCOL);
457         goto finish;
458     }
459
460     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
461         goto finish;
462
463     if (s->state != PA_STREAM_READY)
464         goto finish;
465
466     if (c->version >= 13) {
467         if (s->direction == PA_STREAM_RECORD)
468             s->timing_info.configured_source_usec = usec;
469         else
470             s->timing_info.configured_sink_usec = usec;
471
472         s->buffer_attr.maxlength = maxlength;
473         s->buffer_attr.fragsize = fragsize;
474         s->buffer_attr.tlength = tlength;
475         s->buffer_attr.prebuf = prebuf;
476         s->buffer_attr.minreq = minreq;
477     }
478
479     pa_xfree(s->device_name);
480     s->device_name = pa_xstrdup(dn);
481     s->device_index = di;
482
483     s->suspended = suspended;
484
485     check_smoother_status(s, TRUE, FALSE, FALSE);
486     request_auto_timing_update(s, TRUE);
487
488     if (s->moved_callback)
489         s->moved_callback(s, s->moved_userdata);
490
491 finish:
492     pa_context_unref(c);
493 }
494
495 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
496     pa_context *c = userdata;
497     pa_stream *s;
498     uint32_t channel;
499     pa_usec_t usec = 0;
500     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
501
502     pa_assert(pd);
503     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
504     pa_assert(t);
505     pa_assert(c);
506     pa_assert(PA_REFCNT_VALUE(c) >= 1);
507
508     pa_context_ref(c);
509
510     if (c->version < 15) {
511         pa_context_fail(c, PA_ERR_PROTOCOL);
512         goto finish;
513     }
514
515     if (pa_tagstruct_getu32(t, &channel) < 0) {
516         pa_context_fail(c, PA_ERR_PROTOCOL);
517         goto finish;
518     }
519
520     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
521         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
522             pa_tagstruct_getu32(t, &fragsize) < 0 ||
523             pa_tagstruct_get_usec(t, &usec) < 0) {
524             pa_context_fail(c, PA_ERR_PROTOCOL);
525             goto finish;
526         }
527     } else {
528         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
529             pa_tagstruct_getu32(t, &tlength) < 0 ||
530             pa_tagstruct_getu32(t, &prebuf) < 0 ||
531             pa_tagstruct_getu32(t, &minreq) < 0 ||
532             pa_tagstruct_get_usec(t, &usec) < 0) {
533             pa_context_fail(c, PA_ERR_PROTOCOL);
534             goto finish;
535         }
536     }
537
538     if (!pa_tagstruct_eof(t)) {
539         pa_context_fail(c, PA_ERR_PROTOCOL);
540         goto finish;
541     }
542
543     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
544         goto finish;
545
546     if (s->state != PA_STREAM_READY)
547         goto finish;
548
549     if (s->direction == PA_STREAM_RECORD)
550         s->timing_info.configured_source_usec = usec;
551     else
552         s->timing_info.configured_sink_usec = usec;
553
554     s->buffer_attr.maxlength = maxlength;
555     s->buffer_attr.fragsize = fragsize;
556     s->buffer_attr.tlength = tlength;
557     s->buffer_attr.prebuf = prebuf;
558     s->buffer_attr.minreq = minreq;
559
560     request_auto_timing_update(s, TRUE);
561
562     if (s->buffer_attr_callback)
563         s->buffer_attr_callback(s, s->buffer_attr_userdata);
564
565 finish:
566     pa_context_unref(c);
567 }
568
569 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
570     pa_context *c = userdata;
571     pa_stream *s;
572     uint32_t channel;
573     pa_bool_t suspended;
574
575     pa_assert(pd);
576     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
577     pa_assert(t);
578     pa_assert(c);
579     pa_assert(PA_REFCNT_VALUE(c) >= 1);
580
581     pa_context_ref(c);
582
583     if (c->version < 12) {
584         pa_context_fail(c, PA_ERR_PROTOCOL);
585         goto finish;
586     }
587
588     if (pa_tagstruct_getu32(t, &channel) < 0 ||
589         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
590         !pa_tagstruct_eof(t)) {
591         pa_context_fail(c, PA_ERR_PROTOCOL);
592         goto finish;
593     }
594
595     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
596         goto finish;
597
598     if (s->state != PA_STREAM_READY)
599         goto finish;
600
601     s->suspended = suspended;
602
603     check_smoother_status(s, TRUE, FALSE, FALSE);
604     request_auto_timing_update(s, TRUE);
605
606     if (s->suspended_callback)
607         s->suspended_callback(s, s->suspended_userdata);
608
609 finish:
610     pa_context_unref(c);
611 }
612
613 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
614     pa_context *c = userdata;
615     pa_stream *s;
616     uint32_t channel;
617
618     pa_assert(pd);
619     pa_assert(command == PA_COMMAND_STARTED);
620     pa_assert(t);
621     pa_assert(c);
622     pa_assert(PA_REFCNT_VALUE(c) >= 1);
623
624     pa_context_ref(c);
625
626     if (c->version < 13) {
627         pa_context_fail(c, PA_ERR_PROTOCOL);
628         goto finish;
629     }
630
631     if (pa_tagstruct_getu32(t, &channel) < 0 ||
632         !pa_tagstruct_eof(t)) {
633         pa_context_fail(c, PA_ERR_PROTOCOL);
634         goto finish;
635     }
636
637     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
638         goto finish;
639
640     if (s->state != PA_STREAM_READY)
641         goto finish;
642
643     check_smoother_status(s, TRUE, TRUE, FALSE);
644     request_auto_timing_update(s, TRUE);
645
646     if (s->started_callback)
647         s->started_callback(s, s->started_userdata);
648
649 finish:
650     pa_context_unref(c);
651 }
652
653 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
654     pa_context *c = userdata;
655     pa_stream *s;
656     uint32_t channel;
657     pa_proplist *pl = NULL;
658     const char *event;
659
660     pa_assert(pd);
661     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
662     pa_assert(t);
663     pa_assert(c);
664     pa_assert(PA_REFCNT_VALUE(c) >= 1);
665
666     pa_context_ref(c);
667
668     if (c->version < 15) {
669         pa_context_fail(c, PA_ERR_PROTOCOL);
670         goto finish;
671     }
672
673     pl = pa_proplist_new();
674
675     if (pa_tagstruct_getu32(t, &channel) < 0 ||
676         pa_tagstruct_gets(t, &event) < 0 ||
677         pa_tagstruct_get_proplist(t, pl) < 0 ||
678         !pa_tagstruct_eof(t) || !event) {
679         pa_context_fail(c, PA_ERR_PROTOCOL);
680         goto finish;
681     }
682
683     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
684         goto finish;
685
686     if (s->state != PA_STREAM_READY)
687         goto finish;
688
689     if (s->event_callback)
690         s->event_callback(s, event, pl, s->event_userdata);
691
692 finish:
693     pa_context_unref(c);
694
695     if (pl)
696         pa_proplist_free(pl);
697 }
698
699 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
700     pa_stream *s;
701     pa_context *c = userdata;
702     uint32_t bytes, channel;
703
704     pa_assert(pd);
705     pa_assert(command == PA_COMMAND_REQUEST);
706     pa_assert(t);
707     pa_assert(c);
708     pa_assert(PA_REFCNT_VALUE(c) >= 1);
709
710     pa_context_ref(c);
711
712     if (pa_tagstruct_getu32(t, &channel) < 0 ||
713         pa_tagstruct_getu32(t, &bytes) < 0 ||
714         !pa_tagstruct_eof(t)) {
715         pa_context_fail(c, PA_ERR_PROTOCOL);
716         goto finish;
717     }
718
719     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
720         goto finish;
721
722     if (s->state != PA_STREAM_READY)
723         goto finish;
724
725     s->requested_bytes += bytes;
726
727     if (s->requested_bytes > 0 && s->write_callback)
728         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
729
730 finish:
731     pa_context_unref(c);
732 }
733
734 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
735     pa_stream *s;
736     pa_context *c = userdata;
737     uint32_t channel;
738
739     pa_assert(pd);
740     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
741     pa_assert(t);
742     pa_assert(c);
743     pa_assert(PA_REFCNT_VALUE(c) >= 1);
744
745     pa_context_ref(c);
746
747     if (pa_tagstruct_getu32(t, &channel) < 0 ||
748         !pa_tagstruct_eof(t)) {
749         pa_context_fail(c, PA_ERR_PROTOCOL);
750         goto finish;
751     }
752
753     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
754         goto finish;
755
756     if (s->state != PA_STREAM_READY)
757         goto finish;
758
759     if (s->buffer_attr.prebuf > 0)
760         check_smoother_status(s, TRUE, FALSE, TRUE);
761
762     request_auto_timing_update(s, TRUE);
763
764     if (command == PA_COMMAND_OVERFLOW) {
765         if (s->overflow_callback)
766             s->overflow_callback(s, s->overflow_userdata);
767     } else if (command == PA_COMMAND_UNDERFLOW) {
768         if (s->underflow_callback)
769             s->underflow_callback(s, s->underflow_userdata);
770     }
771
772  finish:
773     pa_context_unref(c);
774 }
775
776 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
777     pa_assert(s);
778     pa_assert(PA_REFCNT_VALUE(s) >= 1);
779
780 /*     pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
781
782     if (s->state != PA_STREAM_READY)
783         return;
784
785     if (w) {
786         s->write_index_not_before = s->context->ctag;
787
788         if (s->timing_info_valid)
789             s->timing_info.write_index_corrupt = TRUE;
790
791 /*         pa_log("write_index invalidated"); */
792     }
793
794     if (r) {
795         s->read_index_not_before = s->context->ctag;
796
797         if (s->timing_info_valid)
798             s->timing_info.read_index_corrupt = TRUE;
799
800 /*         pa_log("read_index invalidated"); */
801     }
802
803     request_auto_timing_update(s, TRUE);
804 }
805
806 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
807     pa_stream *s = userdata;
808
809     pa_assert(s);
810     pa_assert(PA_REFCNT_VALUE(s) >= 1);
811
812     pa_stream_ref(s);
813     request_auto_timing_update(s, FALSE);
814     pa_stream_unref(s);
815 }
816
817 static void create_stream_complete(pa_stream *s) {
818     pa_assert(s);
819     pa_assert(PA_REFCNT_VALUE(s) >= 1);
820     pa_assert(s->state == PA_STREAM_CREATING);
821
822     pa_stream_set_state(s, PA_STREAM_READY);
823
824     if (s->requested_bytes > 0 && s->write_callback)
825         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
826
827     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
828         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
829         pa_assert(!s->auto_timing_update_event);
830         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
831
832         request_auto_timing_update(s, TRUE);
833     }
834
835     check_smoother_status(s, TRUE, FALSE, FALSE);
836 }
837
838 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
839     pa_assert(s);
840     pa_assert(attr);
841     pa_assert(ss);
842
843     if (s->context->version >= 13)
844         return;
845
846     /* Version older than 0.9.10 didn't do server side buffer_attr
847      * selection, hence we have to fake it on the client side. */
848
849     /* We choose fairly conservative values here, to not confuse
850      * old clients with extremely large playback buffers */
851
852     if (attr->maxlength == (uint32_t) -1)
853         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
854
855     if (attr->tlength == (uint32_t) -1)
856         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
857
858     if (attr->minreq == (uint32_t) -1)
859         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
860
861     if (attr->prebuf == (uint32_t) -1)
862         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
863
864     if (attr->fragsize == (uint32_t) -1)
865         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
866 }
867
868 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
869     pa_stream *s = userdata;
870     uint32_t requested_bytes;
871
872     pa_assert(pd);
873     pa_assert(s);
874     pa_assert(PA_REFCNT_VALUE(s) >= 1);
875     pa_assert(s->state == PA_STREAM_CREATING);
876
877     pa_stream_ref(s);
878
879     if (command != PA_COMMAND_REPLY) {
880         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
881             goto finish;
882
883         pa_stream_set_state(s, PA_STREAM_FAILED);
884         goto finish;
885     }
886
887     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
888         s->channel == PA_INVALID_INDEX ||
889         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 ||  s->stream_index == PA_INVALID_INDEX)) ||
890         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
891         pa_context_fail(s->context, PA_ERR_PROTOCOL);
892         goto finish;
893     }
894
895     s->requested_bytes = (int64_t) requested_bytes;
896
897     if (s->context->version >= 9) {
898         if (s->direction == PA_STREAM_PLAYBACK) {
899             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
900                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
901                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
902                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
903                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
904                 goto finish;
905             }
906         } else if (s->direction == PA_STREAM_RECORD) {
907             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
909                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
910                 goto finish;
911             }
912         }
913     }
914
915     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
916         pa_sample_spec ss;
917         pa_channel_map cm;
918         const char *dn = NULL;
919         pa_bool_t suspended;
920
921         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
922             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
923             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
924             pa_tagstruct_gets(t, &dn) < 0 ||
925             pa_tagstruct_get_boolean(t, &suspended) < 0) {
926             pa_context_fail(s->context, PA_ERR_PROTOCOL);
927             goto finish;
928         }
929
930         if (!dn || s->device_index == PA_INVALID_INDEX ||
931             ss.channels != cm.channels ||
932             !pa_channel_map_valid(&cm) ||
933             !pa_sample_spec_valid(&ss) ||
934             (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
935             (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
936             (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
937             pa_context_fail(s->context, PA_ERR_PROTOCOL);
938             goto finish;
939         }
940
941         pa_xfree(s->device_name);
942         s->device_name = pa_xstrdup(dn);
943         s->suspended = suspended;
944
945         s->channel_map = cm;
946         s->sample_spec = ss;
947     }
948
949     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
950         pa_usec_t usec;
951
952         if (pa_tagstruct_get_usec(t, &usec) < 0) {
953             pa_context_fail(s->context, PA_ERR_PROTOCOL);
954             goto finish;
955         }
956
957         if (s->direction == PA_STREAM_RECORD)
958             s->timing_info.configured_source_usec = usec;
959         else
960             s->timing_info.configured_sink_usec = usec;
961     }
962
963     if (!pa_tagstruct_eof(t)) {
964         pa_context_fail(s->context, PA_ERR_PROTOCOL);
965         goto finish;
966     }
967
968     if (s->direction == PA_STREAM_RECORD) {
969         pa_assert(!s->record_memblockq);
970
971         s->record_memblockq = pa_memblockq_new(
972                 0,
973                 s->buffer_attr.maxlength,
974                 0,
975                 pa_frame_size(&s->sample_spec),
976                 1,
977                 0,
978                 0,
979                 NULL);
980     }
981
982     s->channel_valid = TRUE;
983     pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
984
985     create_stream_complete(s);
986
987 finish:
988     pa_stream_unref(s);
989 }
990
991 static int create_stream(
992         pa_stream_direction_t direction,
993         pa_stream *s,
994         const char *dev,
995         const pa_buffer_attr *attr,
996         pa_stream_flags_t flags,
997         const pa_cvolume *volume,
998         pa_stream *sync_stream) {
999
1000     pa_tagstruct *t;
1001     uint32_t tag;
1002     pa_bool_t volume_set = FALSE;
1003
1004     pa_assert(s);
1005     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1007
1008     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1009     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1010     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1011     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1012                                               PA_STREAM_INTERPOLATE_TIMING|
1013                                               PA_STREAM_NOT_MONOTONIC|
1014                                               PA_STREAM_AUTO_TIMING_UPDATE|
1015                                               PA_STREAM_NO_REMAP_CHANNELS|
1016                                               PA_STREAM_NO_REMIX_CHANNELS|
1017                                               PA_STREAM_FIX_FORMAT|
1018                                               PA_STREAM_FIX_RATE|
1019                                               PA_STREAM_FIX_CHANNELS|
1020                                               PA_STREAM_DONT_MOVE|
1021                                               PA_STREAM_VARIABLE_RATE|
1022                                               PA_STREAM_PEAK_DETECT|
1023                                               PA_STREAM_START_MUTED|
1024                                               PA_STREAM_ADJUST_LATENCY|
1025                                               PA_STREAM_EARLY_REQUESTS|
1026                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1027                                               PA_STREAM_START_UNMUTED|
1028                                               PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1029
1030     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1031     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1032     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1033     /* Althought some of the other flags are not supported on older
1034      * version, we don't check for them here, because it doesn't hurt
1035      * when they are passed but actually not supported. This makes
1036      * client development easier */
1037
1038     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1039     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1040     PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1041     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1042     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1043
1044     pa_stream_ref(s);
1045
1046     s->direction = direction;
1047     s->flags = flags;
1048     s->corked = !!(flags & PA_STREAM_START_CORKED);
1049
1050     if (sync_stream)
1051         s->syncid = sync_stream->syncid;
1052
1053     if (attr)
1054         s->buffer_attr = *attr;
1055     automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1056
1057     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1058         pa_usec_t x;
1059
1060         x = pa_rtclock_now();
1061
1062         pa_assert(!s->smoother);
1063         s->smoother = pa_smoother_new(
1064                 SMOOTHER_ADJUST_TIME,
1065                 SMOOTHER_HISTORY_TIME,
1066                 !(flags & PA_STREAM_NOT_MONOTONIC),
1067                 TRUE,
1068                 SMOOTHER_MIN_HISTORY,
1069                 x,
1070                 TRUE);
1071     }
1072
1073     if (!dev)
1074         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1075
1076     t = pa_tagstruct_command(
1077             s->context,
1078             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1079             &tag);
1080
1081     if (s->context->version < 13)
1082         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1083
1084     pa_tagstruct_put(
1085             t,
1086             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1087             PA_TAG_CHANNEL_MAP, &s->channel_map,
1088             PA_TAG_U32, PA_INVALID_INDEX,
1089             PA_TAG_STRING, dev,
1090             PA_TAG_U32, s->buffer_attr.maxlength,
1091             PA_TAG_BOOLEAN, s->corked,
1092             PA_TAG_INVALID);
1093
1094     if (s->direction == PA_STREAM_PLAYBACK) {
1095         pa_cvolume cv;
1096
1097         pa_tagstruct_put(
1098                 t,
1099                 PA_TAG_U32, s->buffer_attr.tlength,
1100                 PA_TAG_U32, s->buffer_attr.prebuf,
1101                 PA_TAG_U32, s->buffer_attr.minreq,
1102                 PA_TAG_U32, s->syncid,
1103                 PA_TAG_INVALID);
1104
1105         volume_set = !!volume;
1106
1107         if (!volume)
1108             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1109
1110         pa_tagstruct_put_cvolume(t, volume);
1111     } else
1112         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1113
1114     if (s->context->version >= 12) {
1115         pa_tagstruct_put(
1116                 t,
1117                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1118                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1119                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1120                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1121                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1122                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1123                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1124                 PA_TAG_INVALID);
1125     }
1126
1127     if (s->context->version >= 13) {
1128
1129         if (s->direction == PA_STREAM_PLAYBACK)
1130             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1131         else
1132             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1133
1134         pa_tagstruct_put(
1135                 t,
1136                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1137                 PA_TAG_PROPLIST, s->proplist,
1138                 PA_TAG_INVALID);
1139
1140         if (s->direction == PA_STREAM_RECORD)
1141             pa_tagstruct_putu32(t, s->direct_on_input);
1142     }
1143
1144     if (s->context->version >= 14) {
1145
1146         if (s->direction == PA_STREAM_PLAYBACK)
1147             pa_tagstruct_put_boolean(t, volume_set);
1148
1149         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1150     }
1151
1152     if (s->context->version >= 15) {
1153
1154         if (s->direction == PA_STREAM_PLAYBACK)
1155             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1156
1157         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1158         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1159     }
1160
1161     pa_pstream_send_tagstruct(s->context->pstream, t);
1162     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1163
1164     pa_stream_set_state(s, PA_STREAM_CREATING);
1165
1166     pa_stream_unref(s);
1167     return 0;
1168 }
1169
1170 int pa_stream_connect_playback(
1171         pa_stream *s,
1172         const char *dev,
1173         const pa_buffer_attr *attr,
1174         pa_stream_flags_t flags,
1175         const pa_cvolume *volume,
1176         pa_stream *sync_stream) {
1177
1178     pa_assert(s);
1179     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1180
1181     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1182 }
1183
1184 int pa_stream_connect_record(
1185         pa_stream *s,
1186         const char *dev,
1187         const pa_buffer_attr *attr,
1188         pa_stream_flags_t flags) {
1189
1190     pa_assert(s);
1191     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1192
1193     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1194 }
1195
1196 int pa_stream_begin_write(
1197         pa_stream *s,
1198         void **data,
1199         size_t *nbytes) {
1200
1201     pa_assert(s);
1202     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1203
1204     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1205     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1206     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1207     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1208     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1209
1210     if (!s->write_memblock) {
1211         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1212         s->write_data = pa_memblock_acquire(s->write_memblock);
1213     }
1214
1215     *data = s->write_data;
1216     *nbytes = pa_memblock_get_length(s->write_memblock);
1217
1218     return 0;
1219 }
1220
1221 int pa_stream_cancel_write(
1222         pa_stream *s) {
1223
1224     pa_assert(s);
1225     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1226
1227     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1228     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1229     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1230     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1231
1232     pa_assert(s->write_data);
1233
1234     pa_memblock_release(s->write_memblock);
1235     pa_memblock_unref(s->write_memblock);
1236     s->write_memblock = NULL;
1237     s->write_data = NULL;
1238
1239     return 0;
1240 }
1241
1242 int pa_stream_write(
1243         pa_stream *s,
1244         const void *data,
1245         size_t length,
1246         pa_free_cb_t free_cb,
1247         int64_t offset,
1248         pa_seek_mode_t seek) {
1249
1250     pa_assert(s);
1251     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1252     pa_assert(data);
1253
1254     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1255     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1256     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1257     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1258     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1259     PA_CHECK_VALIDITY(s->context,
1260                       !s->write_memblock ||
1261                       ((data >= s->write_data) &&
1262                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1263                       PA_ERR_INVALID);
1264     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1265
1266     if (s->write_memblock) {
1267         pa_memchunk chunk;
1268
1269         /* pa_stream_write_begin() was called before */
1270
1271         pa_memblock_release(s->write_memblock);
1272
1273         chunk.memblock = s->write_memblock;
1274         chunk.index = (const char *) data - (const char *) s->write_data;
1275         chunk.length = length;
1276
1277         s->write_memblock = NULL;
1278         s->write_data = NULL;
1279
1280         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1281         pa_memblock_unref(chunk.memblock);
1282
1283     } else {
1284         pa_seek_mode_t t_seek = seek;
1285         int64_t t_offset = offset;
1286         size_t t_length = length;
1287         const void *t_data = data;
1288
1289         /* pa_stream_write_begin() was not called before */
1290
1291         while (t_length > 0) {
1292             pa_memchunk chunk;
1293
1294             chunk.index = 0;
1295
1296             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1297                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1298                 chunk.length = t_length;
1299             } else {
1300                 void *d;
1301
1302                 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1303                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1304
1305                 d = pa_memblock_acquire(chunk.memblock);
1306                 memcpy(d, t_data, chunk.length);
1307                 pa_memblock_release(chunk.memblock);
1308             }
1309
1310             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1311
1312             t_offset = 0;
1313             t_seek = PA_SEEK_RELATIVE;
1314
1315             t_data = (const uint8_t*) t_data + chunk.length;
1316             t_length -= chunk.length;
1317
1318             pa_memblock_unref(chunk.memblock);
1319         }
1320
1321         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1322             free_cb((void*) data);
1323     }
1324
1325     /* This is obviously wrong since we ignore the seeking index . But
1326      * that's OK, the server side applies the same error */
1327     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1328
1329     if (s->direction == PA_STREAM_PLAYBACK) {
1330
1331         /* Update latency request correction */
1332         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1333
1334             if (seek == PA_SEEK_ABSOLUTE) {
1335                 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1336                 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1337                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1338             } else if (seek == PA_SEEK_RELATIVE) {
1339                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1340                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1341             } else
1342                 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1343         }
1344
1345         /* Update the write index in the already available latency data */
1346         if (s->timing_info_valid) {
1347
1348             if (seek == PA_SEEK_ABSOLUTE) {
1349                 s->timing_info.write_index_corrupt = FALSE;
1350                 s->timing_info.write_index = offset + (int64_t) length;
1351             } else if (seek == PA_SEEK_RELATIVE) {
1352                 if (!s->timing_info.write_index_corrupt)
1353                     s->timing_info.write_index += offset + (int64_t) length;
1354             } else
1355                 s->timing_info.write_index_corrupt = TRUE;
1356         }
1357
1358         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1359             request_auto_timing_update(s, TRUE);
1360     }
1361
1362     return 0;
1363 }
1364
1365 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1366     pa_assert(s);
1367     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1368     pa_assert(data);
1369     pa_assert(length);
1370
1371     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1372     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1373     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1374
1375     if (!s->peek_memchunk.memblock) {
1376
1377         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1378             *data = NULL;
1379             *length = 0;
1380             return 0;
1381         }
1382
1383         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1384     }
1385
1386     pa_assert(s->peek_data);
1387     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1388     *length = s->peek_memchunk.length;
1389     return 0;
1390 }
1391
1392 int pa_stream_drop(pa_stream *s) {
1393     pa_assert(s);
1394     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1395
1396     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1397     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1398     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1399     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1400
1401     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1402
1403     /* Fix the simulated local read index */
1404     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1405         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1406
1407     pa_assert(s->peek_data);
1408     pa_memblock_release(s->peek_memchunk.memblock);
1409     pa_memblock_unref(s->peek_memchunk.memblock);
1410     pa_memchunk_reset(&s->peek_memchunk);
1411
1412     return 0;
1413 }
1414
1415 size_t pa_stream_writable_size(pa_stream *s) {
1416     pa_assert(s);
1417     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1418
1419     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1420     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1421     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1422
1423     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1424 }
1425
1426 size_t pa_stream_readable_size(pa_stream *s) {
1427     pa_assert(s);
1428     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1429
1430     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1431     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1432     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1433
1434     return pa_memblockq_get_length(s->record_memblockq);
1435 }
1436
1437 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1438     pa_operation *o;
1439     pa_tagstruct *t;
1440     uint32_t tag;
1441
1442     pa_assert(s);
1443     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1444
1445     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1446     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1447     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1448
1449     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1450
1451     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1452     pa_tagstruct_putu32(t, s->channel);
1453     pa_pstream_send_tagstruct(s->context->pstream, t);
1454     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1455
1456     return o;
1457 }
1458
1459 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1460     pa_usec_t usec;
1461
1462     pa_assert(s);
1463     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1464     pa_assert(s->state == PA_STREAM_READY);
1465     pa_assert(s->direction != PA_STREAM_UPLOAD);
1466     pa_assert(s->timing_info_valid);
1467     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1468     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1469
1470     if (s->direction == PA_STREAM_PLAYBACK) {
1471         /* The last byte that was written into the output device
1472          * had this time value associated */
1473         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1474
1475         if (!s->corked && !s->suspended) {
1476
1477             if (!ignore_transport)
1478                 /* Because the latency info took a little time to come
1479                  * to us, we assume that the real output time is actually
1480                  * a little ahead */
1481                 usec += s->timing_info.transport_usec;
1482
1483             /* However, the output device usually maintains a buffer
1484                too, hence the real sample currently played is a little
1485                back  */
1486             if (s->timing_info.sink_usec >= usec)
1487                 usec = 0;
1488             else
1489                 usec -= s->timing_info.sink_usec;
1490         }
1491
1492     } else {
1493         pa_assert(s->direction == PA_STREAM_RECORD);
1494
1495         /* The last byte written into the server side queue had
1496          * this time value associated */
1497         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1498
1499         if (!s->corked && !s->suspended) {
1500
1501             if (!ignore_transport)
1502                 /* Add transport latency */
1503                 usec += s->timing_info.transport_usec;
1504
1505             /* Add latency of data in device buffer */
1506             usec += s->timing_info.source_usec;
1507
1508             /* If this is a monitor source, we need to correct the
1509              * time by the playback device buffer */
1510             if (s->timing_info.sink_usec >= usec)
1511                 usec = 0;
1512             else
1513                 usec -= s->timing_info.sink_usec;
1514         }
1515     }
1516
1517     return usec;
1518 }
1519
1520 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1521     pa_operation *o = userdata;
1522     struct timeval local, remote, now;
1523     pa_timing_info *i;
1524     pa_bool_t playing = FALSE;
1525     uint64_t underrun_for = 0, playing_for = 0;
1526
1527     pa_assert(pd);
1528     pa_assert(o);
1529     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1530
1531     if (!o->context || !o->stream)
1532         goto finish;
1533
1534     i = &o->stream->timing_info;
1535
1536     o->stream->timing_info_valid = FALSE;
1537     i->write_index_corrupt = TRUE;
1538     i->read_index_corrupt = TRUE;
1539
1540     if (command != PA_COMMAND_REPLY) {
1541         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1542             goto finish;
1543
1544     } else {
1545
1546         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1547             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1548             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1549             pa_tagstruct_get_timeval(t, &local) < 0 ||
1550             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1551             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1552             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1553
1554             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1555             goto finish;
1556         }
1557
1558         if (o->context->version >= 13 &&
1559             o->stream->direction == PA_STREAM_PLAYBACK)
1560             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1561                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1562
1563                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1564                 goto finish;
1565             }
1566
1567
1568         if (!pa_tagstruct_eof(t)) {
1569             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1570             goto finish;
1571         }
1572         o->stream->timing_info_valid = TRUE;
1573         i->write_index_corrupt = FALSE;
1574         i->read_index_corrupt = FALSE;
1575
1576         i->playing = (int) playing;
1577         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1578
1579         pa_gettimeofday(&now);
1580
1581         /* Calculcate timestamps */
1582         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1583             /* local and remote seem to have synchronized clocks */
1584
1585             if (o->stream->direction == PA_STREAM_PLAYBACK)
1586                 i->transport_usec = pa_timeval_diff(&remote, &local);
1587             else
1588                 i->transport_usec = pa_timeval_diff(&now, &remote);
1589
1590             i->synchronized_clocks = TRUE;
1591             i->timestamp = remote;
1592         } else {
1593             /* clocks are not synchronized, let's estimate latency then */
1594             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1595             i->synchronized_clocks = FALSE;
1596             i->timestamp = local;
1597             pa_timeval_add(&i->timestamp, i->transport_usec);
1598         }
1599
1600         /* Invalidate read and write indexes if necessary */
1601         if (tag < o->stream->read_index_not_before)
1602             i->read_index_corrupt = TRUE;
1603
1604         if (tag < o->stream->write_index_not_before)
1605             i->write_index_corrupt = TRUE;
1606
1607         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1608             /* Write index correction */
1609
1610             int n, j;
1611             uint32_t ctag = tag;
1612
1613             /* Go through the saved correction values and add up the
1614              * total correction.*/
1615             for (n = 0, j = o->stream->current_write_index_correction+1;
1616                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1617                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1618
1619                 /* Step over invalid data or out-of-date data */
1620                 if (!o->stream->write_index_corrections[j].valid ||
1621                     o->stream->write_index_corrections[j].tag < ctag)
1622                     continue;
1623
1624                 /* Make sure that everything is in order */
1625                 ctag = o->stream->write_index_corrections[j].tag+1;
1626
1627                 /* Now fix the write index */
1628                 if (o->stream->write_index_corrections[j].corrupt) {
1629                     /* A corrupting seek was made */
1630                     i->write_index_corrupt = TRUE;
1631                 } else if (o->stream->write_index_corrections[j].absolute) {
1632                     /* An absolute seek was made */
1633                     i->write_index = o->stream->write_index_corrections[j].value;
1634                     i->write_index_corrupt = FALSE;
1635                 } else if (!i->write_index_corrupt) {
1636                     /* A relative seek was made */
1637                     i->write_index += o->stream->write_index_corrections[j].value;
1638                 }
1639             }
1640
1641             /* Clear old correction entries */
1642             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1643                 if (!o->stream->write_index_corrections[n].valid)
1644                     continue;
1645
1646                 if (o->stream->write_index_corrections[n].tag <= tag)
1647                     o->stream->write_index_corrections[n].valid = FALSE;
1648             }
1649         }
1650
1651         if (o->stream->direction == PA_STREAM_RECORD) {
1652             /* Read index correction */
1653
1654             if (!i->read_index_corrupt)
1655                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1656         }
1657
1658         /* Update smoother */
1659         if (o->stream->smoother) {
1660             pa_usec_t u, x;
1661
1662             u = x = pa_rtclock_now() - i->transport_usec;
1663
1664             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1665                 pa_usec_t su;
1666
1667                 /* If we weren't playing then it will take some time
1668                  * until the audio will actually come out through the
1669                  * speakers. Since we follow that timing here, we need
1670                  * to try to fix this up */
1671
1672                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1673
1674                 if (su < i->sink_usec)
1675                     x += i->sink_usec - su;
1676             }
1677
1678             if (!i->playing)
1679                 pa_smoother_pause(o->stream->smoother, x);
1680
1681             /* Update the smoother */
1682             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1683                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1684                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1685
1686             if (i->playing)
1687                 pa_smoother_resume(o->stream->smoother, x, TRUE);
1688         }
1689     }
1690
1691     o->stream->auto_timing_update_requested = FALSE;
1692
1693     if (o->stream->latency_update_callback)
1694         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1695
1696     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1697         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1698         cb(o->stream, o->stream->timing_info_valid, o->userdata);
1699     }
1700
1701 finish:
1702
1703     pa_operation_done(o);
1704     pa_operation_unref(o);
1705 }
1706
1707 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1708     uint32_t tag;
1709     pa_operation *o;
1710     pa_tagstruct *t;
1711     struct timeval now;
1712     int cidx = 0;
1713
1714     pa_assert(s);
1715     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1716
1717     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1718     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1719     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1720
1721     if (s->direction == PA_STREAM_PLAYBACK) {
1722         /* Find a place to store the write_index correction data for this entry */
1723         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1724
1725         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1726         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1727     }
1728     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1729
1730     t = pa_tagstruct_command(
1731             s->context,
1732             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1733             &tag);
1734     pa_tagstruct_putu32(t, s->channel);
1735     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1736
1737     pa_pstream_send_tagstruct(s->context->pstream, t);
1738     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1739
1740     if (s->direction == PA_STREAM_PLAYBACK) {
1741         /* Fill in initial correction data */
1742
1743         s->current_write_index_correction = cidx;
1744
1745         s->write_index_corrections[cidx].valid = TRUE;
1746         s->write_index_corrections[cidx].absolute = FALSE;
1747         s->write_index_corrections[cidx].corrupt = FALSE;
1748         s->write_index_corrections[cidx].tag = tag;
1749         s->write_index_corrections[cidx].value = 0;
1750     }
1751
1752     return o;
1753 }
1754
1755 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1756     pa_stream *s = userdata;
1757
1758     pa_assert(pd);
1759     pa_assert(s);
1760     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1761
1762     pa_stream_ref(s);
1763
1764     if (command != PA_COMMAND_REPLY) {
1765         if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1766             goto finish;
1767
1768         pa_stream_set_state(s, PA_STREAM_FAILED);
1769         goto finish;
1770     } else if (!pa_tagstruct_eof(t)) {
1771         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1772         goto finish;
1773     }
1774
1775     pa_stream_set_state(s, PA_STREAM_TERMINATED);
1776
1777 finish:
1778     pa_stream_unref(s);
1779 }
1780
1781 int pa_stream_disconnect(pa_stream *s) {
1782     pa_tagstruct *t;
1783     uint32_t tag;
1784
1785     pa_assert(s);
1786     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1787
1788     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1789     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1790     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1791
1792     pa_stream_ref(s);
1793
1794     t = pa_tagstruct_command(
1795             s->context,
1796             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1797                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1798             &tag);
1799     pa_tagstruct_putu32(t, s->channel);
1800     pa_pstream_send_tagstruct(s->context->pstream, t);
1801     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1802
1803     pa_stream_unref(s);
1804     return 0;
1805 }
1806
1807 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1808     pa_assert(s);
1809     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1810
1811     if (pa_detect_fork())
1812         return;
1813
1814     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1815         return;
1816
1817     s->read_callback = cb;
1818     s->read_userdata = userdata;
1819 }
1820
1821 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1822     pa_assert(s);
1823     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1824
1825     if (pa_detect_fork())
1826         return;
1827
1828     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1829         return;
1830
1831     s->write_callback = cb;
1832     s->write_userdata = userdata;
1833 }
1834
1835 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1836     pa_assert(s);
1837     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1838
1839     if (pa_detect_fork())
1840         return;
1841
1842     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1843         return;
1844
1845     s->state_callback = cb;
1846     s->state_userdata = userdata;
1847 }
1848
1849 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1850     pa_assert(s);
1851     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1852
1853     if (pa_detect_fork())
1854         return;
1855
1856     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1857         return;
1858
1859     s->overflow_callback = cb;
1860     s->overflow_userdata = userdata;
1861 }
1862
1863 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1864     pa_assert(s);
1865     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1866
1867     if (pa_detect_fork())
1868         return;
1869
1870     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1871         return;
1872
1873     s->underflow_callback = cb;
1874     s->underflow_userdata = userdata;
1875 }
1876
1877 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1878     pa_assert(s);
1879     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1880
1881     if (pa_detect_fork())
1882         return;
1883
1884     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1885         return;
1886
1887     s->latency_update_callback = cb;
1888     s->latency_update_userdata = userdata;
1889 }
1890
1891 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1892     pa_assert(s);
1893     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1894
1895     if (pa_detect_fork())
1896         return;
1897
1898     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1899         return;
1900
1901     s->moved_callback = cb;
1902     s->moved_userdata = userdata;
1903 }
1904
1905 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1906     pa_assert(s);
1907     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1908
1909     if (pa_detect_fork())
1910         return;
1911
1912     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1913         return;
1914
1915     s->suspended_callback = cb;
1916     s->suspended_userdata = userdata;
1917 }
1918
1919 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1920     pa_assert(s);
1921     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1922
1923     if (pa_detect_fork())
1924         return;
1925
1926     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1927         return;
1928
1929     s->started_callback = cb;
1930     s->started_userdata = userdata;
1931 }
1932
1933 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1934     pa_assert(s);
1935     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1936
1937     if (pa_detect_fork())
1938         return;
1939
1940     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1941         return;
1942
1943     s->event_callback = cb;
1944     s->event_userdata = userdata;
1945 }
1946
1947 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1948     pa_assert(s);
1949     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1950
1951     if (pa_detect_fork())
1952         return;
1953
1954     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1955         return;
1956
1957     s->buffer_attr_callback = cb;
1958     s->buffer_attr_userdata = userdata;
1959 }
1960
1961 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1962     pa_operation *o = userdata;
1963     int success = 1;
1964
1965     pa_assert(pd);
1966     pa_assert(o);
1967     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1968
1969     if (!o->context)
1970         goto finish;
1971
1972     if (command != PA_COMMAND_REPLY) {
1973         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1974             goto finish;
1975
1976         success = 0;
1977     } else if (!pa_tagstruct_eof(t)) {
1978         pa_context_fail(o->context, PA_ERR_PROTOCOL);
1979         goto finish;
1980     }
1981
1982     if (o->callback) {
1983         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1984         cb(o->stream, success, o->userdata);
1985     }
1986
1987 finish:
1988     pa_operation_done(o);
1989     pa_operation_unref(o);
1990 }
1991
1992 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1993     pa_operation *o;
1994     pa_tagstruct *t;
1995     uint32_t tag;
1996
1997     pa_assert(s);
1998     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1999
2000     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2001     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2002     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2003
2004     s->corked = b;
2005
2006     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2007
2008     t = pa_tagstruct_command(
2009             s->context,
2010             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2011             &tag);
2012     pa_tagstruct_putu32(t, s->channel);
2013     pa_tagstruct_put_boolean(t, !!b);
2014     pa_pstream_send_tagstruct(s->context->pstream, t);
2015     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2016
2017     check_smoother_status(s, FALSE, FALSE, FALSE);
2018
2019     /* This might cause the indexes to hang/start again, hence
2020      * let's request a timing update */
2021     request_auto_timing_update(s, TRUE);
2022
2023     return o;
2024 }
2025
2026 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2027     pa_tagstruct *t;
2028     pa_operation *o;
2029     uint32_t tag;
2030
2031     pa_assert(s);
2032     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2033
2034     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2035     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2036
2037     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2038
2039     t = pa_tagstruct_command(s->context, command, &tag);
2040     pa_tagstruct_putu32(t, s->channel);
2041     pa_pstream_send_tagstruct(s->context->pstream, t);
2042     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2043
2044     return o;
2045 }
2046
2047 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2048     pa_operation *o;
2049
2050     pa_assert(s);
2051     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2052
2053     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2054     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2055     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2056
2057     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2058         return NULL;
2059
2060     if (s->direction == PA_STREAM_PLAYBACK) {
2061
2062         if (s->write_index_corrections[s->current_write_index_correction].valid)
2063             s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2064
2065         if (s->buffer_attr.prebuf > 0)
2066             check_smoother_status(s, FALSE, FALSE, TRUE);
2067
2068         /* This will change the write index, but leave the
2069          * read index untouched. */
2070         invalidate_indexes(s, FALSE, TRUE);
2071
2072     } else
2073         /* For record streams this has no influence on the write
2074          * index, but the read index might jump. */
2075         invalidate_indexes(s, TRUE, FALSE);
2076
2077     return o;
2078 }
2079
2080 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2081     pa_operation *o;
2082
2083     pa_assert(s);
2084     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2085
2086     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2087     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2088     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2089     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2090
2091     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2092         return NULL;
2093
2094     /* This might cause the read index to hang again, hence
2095      * let's request a timing update */
2096     request_auto_timing_update(s, TRUE);
2097
2098     return o;
2099 }
2100
2101 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2102     pa_operation *o;
2103
2104     pa_assert(s);
2105     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2106
2107     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2108     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2109     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2110     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2111
2112     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2113         return NULL;
2114
2115     /* This might cause the read index to start moving again, hence
2116      * let's request a timing update */
2117     request_auto_timing_update(s, TRUE);
2118
2119     return o;
2120 }
2121
2122 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2123     pa_operation *o;
2124
2125     pa_assert(s);
2126     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2127     pa_assert(name);
2128
2129     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2130     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2131     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2132
2133     if (s->context->version >= 13) {
2134         pa_proplist *p = pa_proplist_new();
2135
2136         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2137         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2138         pa_proplist_free(p);
2139     } else {
2140         pa_tagstruct *t;
2141         uint32_t tag;
2142
2143         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2144         t = pa_tagstruct_command(
2145                 s->context,
2146                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2147                 &tag);
2148         pa_tagstruct_putu32(t, s->channel);
2149         pa_tagstruct_puts(t, name);
2150         pa_pstream_send_tagstruct(s->context->pstream, t);
2151         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2152     }
2153
2154     return o;
2155 }
2156
2157 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2158     pa_usec_t usec;
2159
2160     pa_assert(s);
2161     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2162
2163     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2164     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2165     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2166     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2167     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2168     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2169
2170     if (s->smoother)
2171         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2172     else
2173         usec = calc_time(s, FALSE);
2174
2175     /* Make sure the time runs monotonically */
2176     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2177         if (usec < s->previous_time)
2178             usec = s->previous_time;
2179         else
2180             s->previous_time = usec;
2181     }
2182
2183     if (r_usec)
2184         *r_usec = usec;
2185
2186     return 0;
2187 }
2188
2189 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2190     pa_assert(s);
2191     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2192
2193     if (negative)
2194         *negative = 0;
2195
2196     if (a >= b)
2197         return a-b;
2198     else {
2199         if (negative && s->direction == PA_STREAM_RECORD) {
2200             *negative = 1;
2201             return b-a;
2202         } else
2203             return 0;
2204     }
2205 }
2206
2207 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2208     pa_usec_t t, c;
2209     int r;
2210     int64_t cindex;
2211
2212     pa_assert(s);
2213     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2214     pa_assert(r_usec);
2215
2216     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2217     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2218     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2219     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2220     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2221     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2222
2223     if ((r = pa_stream_get_time(s, &t)) < 0)
2224         return r;
2225
2226     if (s->direction == PA_STREAM_PLAYBACK)
2227         cindex = s->timing_info.write_index;
2228     else
2229         cindex = s->timing_info.read_index;
2230
2231     if (cindex < 0)
2232         cindex = 0;
2233
2234     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2235
2236     if (s->direction == PA_STREAM_PLAYBACK)
2237         *r_usec = time_counter_diff(s, c, t, negative);
2238     else
2239         *r_usec = time_counter_diff(s, t, c, negative);
2240
2241     return 0;
2242 }
2243
2244 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2245     pa_assert(s);
2246     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2247
2248     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2249     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2250     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2251     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2252
2253     return &s->timing_info;
2254 }
2255
2256 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2257     pa_assert(s);
2258     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2259
2260     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2261
2262     return &s->sample_spec;
2263 }
2264
2265 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2266     pa_assert(s);
2267     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2268
2269     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2270
2271     return &s->channel_map;
2272 }
2273
2274 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2275     pa_assert(s);
2276     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2277
2278     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2279     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2280     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2281
2282     return &s->buffer_attr;
2283 }
2284
2285 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2286     pa_operation *o = userdata;
2287     int success = 1;
2288
2289     pa_assert(pd);
2290     pa_assert(o);
2291     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2292
2293     if (!o->context)
2294         goto finish;
2295
2296     if (command != PA_COMMAND_REPLY) {
2297         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2298             goto finish;
2299
2300         success = 0;
2301     } else {
2302         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2303             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2304                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2305                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2306                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2307                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2308                 goto finish;
2309             }
2310         } else if (o->stream->direction == PA_STREAM_RECORD) {
2311             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2312                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2313                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2314                 goto finish;
2315             }
2316         }
2317
2318         if (o->stream->context->version >= 13) {
2319             pa_usec_t usec;
2320
2321             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2322                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2323                 goto finish;
2324             }
2325
2326             if (o->stream->direction == PA_STREAM_RECORD)
2327                 o->stream->timing_info.configured_source_usec = usec;
2328             else
2329                 o->stream->timing_info.configured_sink_usec = usec;
2330         }
2331
2332         if (!pa_tagstruct_eof(t)) {
2333             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2334             goto finish;
2335         }
2336     }
2337
2338     if (o->callback) {
2339         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2340         cb(o->stream, success, o->userdata);
2341     }
2342
2343 finish:
2344     pa_operation_done(o);
2345     pa_operation_unref(o);
2346 }
2347
2348
2349 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2350     pa_operation *o;
2351     pa_tagstruct *t;
2352     uint32_t tag;
2353
2354     pa_assert(s);
2355     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2356     pa_assert(attr);
2357
2358     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2359     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2360     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2361     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2362
2363     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2364
2365     t = pa_tagstruct_command(
2366             s->context,
2367             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2368             &tag);
2369     pa_tagstruct_putu32(t, s->channel);
2370
2371     pa_tagstruct_putu32(t, attr->maxlength);
2372
2373     if (s->direction == PA_STREAM_PLAYBACK)
2374         pa_tagstruct_put(
2375                 t,
2376                 PA_TAG_U32, attr->tlength,
2377                 PA_TAG_U32, attr->prebuf,
2378                 PA_TAG_U32, attr->minreq,
2379                 PA_TAG_INVALID);
2380     else
2381         pa_tagstruct_putu32(t, attr->fragsize);
2382
2383     if (s->context->version >= 13)
2384         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2385
2386     if (s->context->version >= 14)
2387         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2388
2389     pa_pstream_send_tagstruct(s->context->pstream, t);
2390     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2391
2392     /* This might cause changes in the read/write indexex, hence let's
2393      * request a timing update */
2394     request_auto_timing_update(s, TRUE);
2395
2396     return o;
2397 }
2398
2399 uint32_t pa_stream_get_device_index(pa_stream *s) {
2400     pa_assert(s);
2401     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2402
2403     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2404     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2405     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2406     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2407     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2408
2409     return s->device_index;
2410 }
2411
2412 const char *pa_stream_get_device_name(pa_stream *s) {
2413     pa_assert(s);
2414     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2415
2416     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2417     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2418     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2419     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2420     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2421
2422     return s->device_name;
2423 }
2424
2425 int pa_stream_is_suspended(pa_stream *s) {
2426     pa_assert(s);
2427     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2428
2429     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2430     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2431     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2432     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2433
2434     return s->suspended;
2435 }
2436
2437 int pa_stream_is_corked(pa_stream *s) {
2438     pa_assert(s);
2439     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2440
2441     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2442     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2443     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2444
2445     return s->corked;
2446 }
2447
2448 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2449     pa_operation *o = userdata;
2450     int success = 1;
2451
2452     pa_assert(pd);
2453     pa_assert(o);
2454     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2455
2456     if (!o->context)
2457         goto finish;
2458
2459     if (command != PA_COMMAND_REPLY) {
2460         if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2461             goto finish;
2462
2463         success = 0;
2464     } else {
2465
2466         if (!pa_tagstruct_eof(t)) {
2467             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2468             goto finish;
2469         }
2470     }
2471
2472     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2473     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2474
2475     if (o->callback) {
2476         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2477         cb(o->stream, success, o->userdata);
2478     }
2479
2480 finish:
2481     pa_operation_done(o);
2482     pa_operation_unref(o);
2483 }
2484
2485
2486 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2487     pa_operation *o;
2488     pa_tagstruct *t;
2489     uint32_t tag;
2490
2491     pa_assert(s);
2492     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2493
2494     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2495     PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2496     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2497     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2498     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2499     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2500
2501     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2502     o->private = PA_UINT_TO_PTR(rate);
2503
2504     t = pa_tagstruct_command(
2505             s->context,
2506             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2507             &tag);
2508     pa_tagstruct_putu32(t, s->channel);
2509     pa_tagstruct_putu32(t, rate);
2510
2511     pa_pstream_send_tagstruct(s->context->pstream, t);
2512     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2513
2514     return o;
2515 }
2516
2517 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2518     pa_operation *o;
2519     pa_tagstruct *t;
2520     uint32_t tag;
2521
2522     pa_assert(s);
2523     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2524
2525     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2526     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2527     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2528     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2529     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2530
2531     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2532
2533     t = pa_tagstruct_command(
2534             s->context,
2535             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2536             &tag);
2537     pa_tagstruct_putu32(t, s->channel);
2538     pa_tagstruct_putu32(t, (uint32_t) mode);
2539     pa_tagstruct_put_proplist(t, p);
2540
2541     pa_pstream_send_tagstruct(s->context->pstream, t);
2542     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2543
2544     /* Please note that we don't update s->proplist here, because we
2545      * don't export that field */
2546
2547     return o;
2548 }
2549
2550 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2551     pa_operation *o;
2552     pa_tagstruct *t;
2553     uint32_t tag;
2554     const char * const*k;
2555
2556     pa_assert(s);
2557     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2558
2559     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2560     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2561     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2562     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2563     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2564
2565     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2566
2567     t = pa_tagstruct_command(
2568             s->context,
2569             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2570             &tag);
2571     pa_tagstruct_putu32(t, s->channel);
2572
2573     for (k = keys; *k; k++)
2574         pa_tagstruct_puts(t, *k);
2575
2576     pa_tagstruct_puts(t, NULL);
2577
2578     pa_pstream_send_tagstruct(s->context->pstream, t);
2579     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2580
2581     /* Please note that we don't update s->proplist here, because we
2582      * don't export that field */
2583
2584     return o;
2585 }
2586
2587 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2588     pa_assert(s);
2589     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2590
2591     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2592     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2593     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2594     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2595
2596     s->direct_on_input = sink_input_idx;
2597
2598     return 0;
2599 }
2600
2601 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2602     pa_assert(s);
2603     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2604
2605     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2606     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2607     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2608
2609     return s->direct_on_input;
2610 }