2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
29 #include <pulse/xmalloc.h>
31 #include <pulsecore/socket.h>
32 #include <pulsecore/core-error.h>
33 #include <pulsecore/core-util.h>
34 #include <pulsecore/log.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/refcnt.h>
40 #define BUFFER_LIMIT (64*1024)
41 #define READ_SIZE (1024)
47 pa_defer_event *defer_event;
48 pa_mainloop_api *mainloop;
51 size_t wbuf_length, wbuf_index, wbuf_valid_length;
54 size_t rbuf_length, rbuf_index, rbuf_valid_length;
56 pa_ioline_cb_t callback;
59 pa_ioline_drain_cb_t drain_callback;
66 static void io_callback(pa_iochannel*io, void *userdata);
67 static void defer_callback(pa_mainloop_api*m, pa_defer_event*e, void *userdata);
69 pa_ioline* pa_ioline_new(pa_iochannel *io) {
73 l = pa_xnew(pa_ioline, 1);
78 l->wbuf_length = l->wbuf_index = l->wbuf_valid_length = 0;
81 l->rbuf_length = l->rbuf_index = l->rbuf_valid_length = 0;
86 l->drain_callback = NULL;
87 l->drain_userdata = NULL;
89 l->mainloop = pa_iochannel_get_mainloop_api(io);
91 l->defer_event = l->mainloop->defer_new(l->mainloop, defer_callback, l);
92 l->mainloop->defer_enable(l->defer_event, 0);
95 l->defer_close = false;
97 pa_iochannel_set_callback(io, io_callback, l);
102 static void ioline_free(pa_ioline *l) {
106 pa_iochannel_free(l->io);
109 l->mainloop->defer_free(l->defer_event);
116 void pa_ioline_unref(pa_ioline *l) {
118 pa_assert(PA_REFCNT_VALUE(l) >= 1);
120 if (PA_REFCNT_DEC(l) <= 0)
124 pa_ioline* pa_ioline_ref(pa_ioline *l) {
126 pa_assert(PA_REFCNT_VALUE(l) >= 1);
132 void pa_ioline_close(pa_ioline *l) {
134 pa_assert(PA_REFCNT_VALUE(l) >= 1);
139 pa_iochannel_free(l->io);
143 if (l->defer_event) {
144 l->mainloop->defer_free(l->defer_event);
145 l->defer_event = NULL;
152 void pa_ioline_puts(pa_ioline *l, const char *c) {
156 pa_assert(PA_REFCNT_VALUE(l) >= 1);
163 if (len > BUFFER_LIMIT - l->wbuf_valid_length)
164 len = BUFFER_LIMIT - l->wbuf_valid_length;
167 pa_assert(l->wbuf_length >= l->wbuf_valid_length);
169 /* In case the allocated buffer is too small, enlarge it. */
170 if (l->wbuf_valid_length + len > l->wbuf_length) {
171 size_t n = l->wbuf_valid_length+len;
172 char *new = pa_xnew(char, (unsigned) n);
175 memcpy(new, l->wbuf+l->wbuf_index, l->wbuf_valid_length);
182 } else if (l->wbuf_index + l->wbuf_valid_length + len > l->wbuf_length) {
184 /* In case the allocated buffer fits, but the current index is too far from the start, move it to the front. */
185 memmove(l->wbuf, l->wbuf+l->wbuf_index, l->wbuf_valid_length);
189 pa_assert(l->wbuf_index + l->wbuf_valid_length + len <= l->wbuf_length);
191 /* Append the new string */
192 memcpy(l->wbuf + l->wbuf_index + l->wbuf_valid_length, c, len);
193 l->wbuf_valid_length += len;
195 l->mainloop->defer_enable(l->defer_event, 1);
199 void pa_ioline_set_callback(pa_ioline*l, pa_ioline_cb_t callback, void *userdata) {
201 pa_assert(PA_REFCNT_VALUE(l) >= 1);
206 l->callback = callback;
207 l->userdata = userdata;
210 void pa_ioline_set_drain_callback(pa_ioline*l, pa_ioline_drain_cb_t callback, void *userdata) {
212 pa_assert(PA_REFCNT_VALUE(l) >= 1);
217 l->drain_callback = callback;
218 l->drain_userdata = userdata;
221 static void failure(pa_ioline *l, bool process_leftover) {
223 pa_assert(PA_REFCNT_VALUE(l) >= 1);
226 if (process_leftover && l->rbuf_valid_length > 0) {
227 /* Pass the last missing bit to the client */
230 char *p = pa_xstrndup(l->rbuf+l->rbuf_index, l->rbuf_valid_length);
231 l->callback(l, p, l->userdata);
237 l->callback(l, NULL, l->userdata);
244 static void scan_for_lines(pa_ioline *l, size_t skip) {
246 pa_assert(PA_REFCNT_VALUE(l) >= 1);
247 pa_assert(skip < l->rbuf_valid_length);
249 while (!l->dead && l->rbuf_valid_length > skip) {
253 if (!(e = memchr(l->rbuf + l->rbuf_index + skip, '\n', l->rbuf_valid_length - skip)))
258 p = l->rbuf + l->rbuf_index;
261 l->rbuf_index += m+1;
262 l->rbuf_valid_length -= m+1;
264 /* A shortcut for the next time */
265 if (l->rbuf_valid_length == 0)
269 l->callback(l, pa_strip_nl(p), l->userdata);
274 /* If the buffer became too large and still no newline was found, drop it. */
275 if (l->rbuf_valid_length >= BUFFER_LIMIT)
276 l->rbuf_index = l->rbuf_valid_length = 0;
279 static int do_write(pa_ioline *l);
281 static int do_read(pa_ioline *l) {
283 pa_assert(PA_REFCNT_VALUE(l) >= 1);
285 while (l->io && !l->dead && pa_iochannel_is_readable(l->io)) {
289 len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length;
291 /* Check if we have to enlarge the read buffer */
292 if (len < READ_SIZE) {
293 size_t n = l->rbuf_valid_length+READ_SIZE;
295 if (n >= BUFFER_LIMIT)
298 if (l->rbuf_length >= n) {
299 /* The current buffer is large enough, let's just move the data to the front */
300 if (l->rbuf_valid_length)
301 memmove(l->rbuf, l->rbuf+l->rbuf_index, l->rbuf_valid_length);
303 /* Enlarge the buffer */
304 char *new = pa_xnew(char, (unsigned) n);
305 if (l->rbuf_valid_length)
306 memcpy(new, l->rbuf+l->rbuf_index, l->rbuf_valid_length);
315 len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length;
317 pa_assert(len >= READ_SIZE);
320 if ((r = pa_iochannel_read(l->io, l->rbuf+l->rbuf_index+l->rbuf_valid_length, len)) <= 0) {
322 if (r < 0 && errno == EAGAIN)
325 if (r < 0 && errno != ECONNRESET) {
326 pa_log("read(): %s", pa_cstrerror(errno));
334 l->rbuf_valid_length += (size_t) r;
336 /* Look if a line has been terminated in the newly read data */
337 scan_for_lines(l, l->rbuf_valid_length - (size_t) r);
343 /* Try to flush the buffer */
344 static int do_write(pa_ioline *l) {
348 pa_assert(PA_REFCNT_VALUE(l) >= 1);
350 while (l->io && !l->dead && pa_iochannel_is_writable(l->io) && l->wbuf_valid_length > 0) {
352 if ((r = pa_iochannel_write(l->io, l->wbuf+l->wbuf_index, l->wbuf_valid_length)) < 0) {
355 pa_log("write(): %s", pa_cstrerror(errno));
362 l->wbuf_index += (size_t) r;
363 l->wbuf_valid_length -= (size_t) r;
365 /* A shortcut for the next time */
366 if (l->wbuf_valid_length == 0)
370 if (l->wbuf_valid_length <= 0 && l->drain_callback)
371 l->drain_callback(l, l->drain_userdata);
376 /* Try to flush read/write data */
377 static void do_work(pa_ioline *l) {
379 pa_assert(PA_REFCNT_VALUE(l) >= 1);
383 l->mainloop->defer_enable(l->defer_event, 0);
391 if (l->defer_close && !l->wbuf_valid_length)
397 static void io_callback(pa_iochannel*io, void *userdata) {
398 pa_ioline *l = userdata;
402 pa_assert(PA_REFCNT_VALUE(l) >= 1);
407 static void defer_callback(pa_mainloop_api*m, pa_defer_event*e, void *userdata) {
408 pa_ioline *l = userdata;
411 pa_assert(PA_REFCNT_VALUE(l) >= 1);
412 pa_assert(l->mainloop == m);
413 pa_assert(l->defer_event == e);
418 void pa_ioline_defer_close(pa_ioline *l) {
420 pa_assert(PA_REFCNT_VALUE(l) >= 1);
422 l->defer_close = true;
424 if (!l->wbuf_valid_length)
425 l->mainloop->defer_enable(l->defer_event, 1);
428 void pa_ioline_printf(pa_ioline *l, const char *format, ...) {
433 pa_assert(PA_REFCNT_VALUE(l) >= 1);
435 va_start(ap, format);
436 t = pa_vsprintf_malloc(format, ap);
439 pa_ioline_puts(l, t);
443 pa_iochannel* pa_ioline_detach_iochannel(pa_ioline *l) {
454 pa_iochannel_set_callback(r, NULL, NULL);
459 bool pa_ioline_is_drained(pa_ioline *l) {
462 return l->wbuf_valid_length <= 0;