daemon: first shot at a basic common pulse interface.
authorKrisztian Litkey <krisztian.litkey@intel.com>
Tue, 22 Apr 2014 11:49:54 +0000 (14:49 +0300)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Tue, 22 Apr 2014 12:08:31 +0000 (15:08 +0300)
In an attempt to provide a common PA interface and prevent
plugin-specific ones spreading like wildfire I lifted and
massaged espeak/pulse over as a common one to daemon. In
its current form it's not versatile enough to migrate all
the plugins over (eg. it provides nothing for recording or
tracking sinks) but it should be enough already for the
voice backend plugins.

src/Makefile.am
src/daemon/context.h
src/daemon/daemon.c
src/daemon/pulse.c [new file with mode: 0644]
src/daemon/pulse.h [new file with mode: 0644]

index f5c1507..84f0faa 100644 (file)
@@ -18,7 +18,8 @@ QUIET_GEN          = $(Q:@=@echo '  GEN   '$@;)
 daemon_includedir  = $(includedir)/srs/daemon
 daemon_include_HEADERS = \
                daemon/client-api-types.h       \
-               daemon/voice-api-types.h
+               daemon/voice-api-types.h        \
+               daemon/pulse.h
 
 srs_daemon_SOURCES =                           \
                daemon/daemon.c                 \
@@ -28,7 +29,8 @@ srs_daemon_SOURCES =                          \
                daemon/plugin.c                 \
                daemon/audiobuf.c               \
                daemon/recognizer.c             \
-               daemon/voice.c
+               daemon/voice.c                  \
+               daemon/pulse.c
 
 srs_daemon_CFLAGS =                            \
                $(AM_CFLAGS)                    \
@@ -328,8 +330,7 @@ if ESPEAK_ENABLED
 plugin_LTLIBRARIES += plugin-espeak-voice.la
 
 plugin_espeak_voice_la_SOURCES =                               \
-               plugins/text-to-speech/espeak/espeak-voice.c    \
-               plugins/text-to-speech/espeak/pulse.c
+               plugins/text-to-speech/espeak/espeak-voice.c
 
 plugin_espeak_voice_la_CFLAGS  =                               \
                $(AM_CFLAGS)
index f5d2ae2..29f4e18 100644 (file)
@@ -42,6 +42,7 @@
 
 /** SRS daemon context type. */
 typedef struct srs_context_s srs_context_t;
+typedef struct srs_pulse_s   srs_pulse_t;
 
 #include "srs/daemon/config.h"
 #include "srs/daemon/resctl.h"
@@ -54,6 +55,7 @@ struct srs_context_s {
     GMainLoop         *gl;               /* GMainLoop if enabled and used */
     void              *pl;               /* PA (native or glib) mainloop */
     pa_mainloop_api   *pa;               /* PA mainloop API */
+    srs_pulse_t       *pulse;            /* audio stream interface */
     mrp_mainloop_t    *ml;               /* associated murphy mainloop */
     mrp_list_hook_t    clients;          /* connected clients */
     mrp_list_hook_t    plugins;          /* loaded plugins */
index bc6dc7d..c85c350 100644 (file)
@@ -42,6 +42,7 @@
 #include "srs/daemon/plugin.h"
 #include "srs/daemon/client.h"
 #include "srs/daemon/recognizer.h"
+#include "srs/daemon/pulse.h"
 
 static void cleanup_mainloop(srs_context_t *srs);
 static void resctl_state_change(srs_resctl_event_t *e, void *user_data);
@@ -50,6 +51,7 @@ static void cleanup_context(srs_context_t *srs)
 {
     if (srs != NULL) {
         srs_resctl_disconnect(srs);
+        srs_pulse_cleanup(srs->pulse);
         cleanup_mainloop(srs);
 
         /*
@@ -131,7 +133,9 @@ static void create_mainloop(srs_context_t *srs)
         srs->ml = mrp_mainloop_glib_get(srs->gl);
     }
 
-    if (srs->pa != NULL && srs->ml != NULL) {
+    srs->pulse = srs_pulse_setup(srs->pa, "SRS daemon");
+
+    if (srs->pa != NULL && srs->pulse != NULL && srs->ml != NULL) {
         if (srs_resctl_connect(srs, resctl_state_change, srs, TRUE))
             return;
     }
diff --git a/src/daemon/pulse.c b/src/daemon/pulse.c
new file mode 100644 (file)
index 0000000..812b12c
--- /dev/null
@@ -0,0 +1,516 @@
+#include <errno.h>
+
+#include <murphy/common/debug.h>
+#include <murphy/common/log.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/list.h>
+#include <murphy/common/refcnt.h>
+
+#include "pulse.h"
+
+#define SPEECH "speech"
+#define TTS    "text-to-speech"
+
+struct srs_pulse_s {
+    pa_mainloop_api *pa;                 /* PA mainloop API */
+    char            *name;               /* PA context name */
+    pa_context      *pc;                 /* PA context */
+    uint32_t         strmid;             /* next stream id */
+    mrp_list_hook_t  streams;            /* active streams */
+    int              connected;          /* whether connection is up */
+    pa_time_event   *reconn;             /* reconnect timer */
+};
+
+
+typedef struct {
+    srs_pulse_t       *p;                /* our pulse_t context */
+    pa_stream         *s;                /* associated PA stream */
+    void              *buf;              /* pre-generated sample buffer */
+    size_t             size;             /* buffer size */
+    size_t             offs;             /* offset to next sample */
+    uint32_t           msec;             /* length in milliseconds */
+    int                rate;             /* sample rate */
+    int                nchannel;         /* number of channels */
+    uint32_t           nsample;          /* number of samples */
+    uint32_t           id;               /* our stream id */
+    int                event_mask;       /* mask of watched events */
+    int                fired_mask;       /* mask of delivered events */
+    srs_stream_cb_t    cb;               /* notification callback */
+    void              *user_data;        /* callback user data */
+    mrp_list_hook_t    hook;             /* hook to list of streams */
+    mrp_refcnt_t       refcnt;           /* reference count */
+    int                stopped : 1;      /* stopped marker */
+    pa_operation      *drain;            /* draining operation */
+} stream_t;
+
+
+static void context_state_cb(pa_context *pc, void *user_data);
+static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
+                             uint32_t idx, void *user_data);
+static void stream_state_cb(pa_stream *s, void *user_data);
+static void stream_write_cb(pa_stream *s, size_t size, void *user_data);
+static void stream_drain_cb(pa_stream *ps, int success, void *user_data);
+
+static void stream_drain(stream_t *s);
+static void stream_notify(stream_t *s, srs_voice_event_type_t event);
+
+
+srs_pulse_t *srs_pulse_setup(pa_mainloop_api *pa, const char *name)
+{
+    srs_pulse_t *p;
+
+    if ((p = mrp_allocz(sizeof(*p))) == NULL)
+        return NULL;
+
+    mrp_list_init(&p->streams);
+    p->pa   = pa;
+    p->name = name ? mrp_strdup(name) : mrp_strdup("Winthorpe");
+    p->pc   = pa_context_new(p->pa, p->name);
+
+    if (p->pc == NULL) {
+        mrp_free(p);
+
+        return NULL;
+    }
+
+    mrp_log_info("pulse: trying to connect to server...");
+
+    p->strmid = 1;
+
+    pa_context_set_state_callback(p->pc, context_state_cb, p);
+    pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
+    pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
+
+    return p;
+}
+
+
+void srs_pulse_cleanup(srs_pulse_t *p)
+{
+    if (p->pc != NULL) {
+        pa_context_disconnect(p->pc);
+        p->pc = NULL;
+        mrp_free(p->name);
+        p->name = NULL;
+        mrp_free(p);
+    }
+}
+
+
+static void stream_destroy(stream_t *s)
+{
+    mrp_debug("destroying stream #%d", s->id);
+
+    mrp_list_delete(&s->hook);
+
+    if (s->s != NULL) {
+        pa_stream_set_state_callback(s->s, NULL, NULL);
+        pa_stream_set_write_callback(s->s, NULL, NULL);
+        pa_stream_disconnect(s->s);
+        pa_stream_unref(s->s);
+        s->s = NULL;
+        mrp_free(s->buf);
+        s->buf = NULL;
+    }
+
+    mrp_free(s);
+}
+
+
+static inline stream_t *stream_ref(stream_t *s)
+{
+    if (s != NULL) {
+        mrp_ref_obj(s, refcnt);
+        mrp_debug("stream reference count increased to %d", s->refcnt);
+    }
+
+    return s;
+}
+
+
+static inline void stream_unref(stream_t *s)
+{
+    if (mrp_unref_obj(s, refcnt))
+        stream_destroy(s);
+    else
+        mrp_debug("stream reference count decreased to %d", s->refcnt);
+}
+
+
+uint32_t srs_play_stream(srs_pulse_t *p, void *sample_buf, int sample_rate,
+                         int nchannel, uint32_t nsample, char **tags,
+                         int event_mask, srs_stream_cb_t cb,
+                         void *user_data)
+{
+    char           **t;
+    stream_t        *s;
+    pa_sample_spec   ss;
+    pa_buffer_attr   ba;
+    pa_proplist     *props;
+    size_t           pamin, pabuf;
+    int              flags;
+
+    if ((s = mrp_allocz(sizeof(*s))) == NULL)
+        return 0;
+
+    mrp_list_init(&s->hook);
+    mrp_refcnt_init(&s->refcnt);
+
+    if (tags != NULL) {
+        if ((props = pa_proplist_new()) == NULL) {
+            mrp_free(s);
+            return 0;
+        }
+
+        pa_proplist_sets(props, PA_PROP_MEDIA_ROLE, SPEECH);
+
+        for (t = tags; *t; t++)
+            pa_proplist_setp(props, *t);
+    }
+    else
+        props = NULL;
+
+    memset(&ss, 0, sizeof(ss));
+    ss.format   = PA_SAMPLE_S16LE;
+    ss.rate     = sample_rate;
+    ss.channels = nchannel;
+
+    pamin  = pa_usec_to_bytes(100 * PA_USEC_PER_MSEC, &ss);
+    pabuf  = pa_usec_to_bytes(300 * PA_USEC_PER_MSEC, &ss);
+
+    ba.maxlength = -1;
+    ba.tlength   = pabuf;
+    ba.minreq    = pamin;
+    ba.prebuf    = pabuf;
+    ba.fragsize  = -1;
+
+    s->s = pa_stream_new_with_proplist(p->pc, TTS, &ss, NULL, props);
+    if (props != NULL)
+        pa_proplist_free(props);
+
+    if (s->s == NULL) {
+        mrp_free(s);
+        return 0;
+    }
+
+    s->p          = p;
+    s->buf        = sample_buf;
+    s->offs       = 0;
+    s->rate       = sample_rate;
+    s->nchannel   = nchannel;
+    s->nsample    = nsample;
+    s->size       = 2 * nsample * nchannel;
+    s->msec       = (1.0 * nsample) / sample_rate * 1000;
+    s->cb         = cb;
+    s->user_data  = user_data;
+    s->id         = p->strmid++;
+    s->event_mask = event_mask;
+
+    pa_stream_set_state_callback(s->s, stream_state_cb, s);
+    pa_stream_set_write_callback(s->s, stream_write_cb, s);
+
+    flags = PA_STREAM_ADJUST_LATENCY;
+    pa_stream_connect_playback(s->s, NULL, &ba, flags, NULL, NULL);
+
+    mrp_list_append(&p->streams, &s->hook);
+
+    return s->id;
+}
+
+
+static void stream_stop(stream_t *s, int drain, int notify)
+{
+    if (s->stopped)
+        return;
+    else
+        s->stopped = TRUE;
+
+    if (!notify)
+        s->event_mask = 0;
+
+    if (!drain) {
+        stream_notify(s, s->offs >= s->size ?
+                      SRS_STREAM_EVENT_COMPLETED : SRS_STREAM_EVENT_ABORTED);
+    }
+    else
+        stream_drain(s);
+
+    stream_unref(s);                     /* remove intial reference */
+}
+
+
+int srs_stop_stream(srs_pulse_t *p, uint32_t id, int drain, int notify)
+{
+    mrp_list_hook_t *sp, *sn;
+    stream_t        *se, *s;
+
+    mrp_debug("stopping stream #%u", id);
+
+    s = NULL;
+    mrp_list_foreach(&p->streams, sp, sn) {
+        se = mrp_list_entry(sp, typeof(*se), hook);
+
+        if (se->id == id) {
+            s = se;
+            break;
+        }
+    }
+
+    if (s == NULL) {
+        errno = ENOENT;
+        return -1;
+    }
+
+    stream_stop(s, drain, notify);
+
+    return 0;
+}
+
+
+static void connect_timer_cb(pa_mainloop_api *api, pa_time_event *e,
+                             const struct timeval *tv, void *user_data)
+{
+    srs_pulse_t *p = (srs_pulse_t *)user_data;
+
+    if (p->pc != NULL) {
+        pa_context_unref(p->pc);
+        p->pc = NULL;
+    }
+
+    p->pc = pa_context_new(p->pa, p->name);
+
+    pa_context_set_state_callback(p->pc, context_state_cb, p);
+    pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
+    pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
+
+    p->pa->time_free(p->reconn);
+    p->reconn = NULL;
+}
+
+
+static void stop_reconnect(srs_pulse_t *p)
+{
+    if (p->reconn != NULL) {
+        p->pa->time_free(p->reconn);
+        p->reconn = NULL;
+    }
+}
+
+
+static void start_reconnect(srs_pulse_t *p)
+{
+    struct timeval tv;
+
+    stop_reconnect(p);
+
+    pa_timeval_add(pa_gettimeofday(&tv), 5000);
+
+    p->reconn = p->pa->time_new(p->pa, &tv, connect_timer_cb, p);
+}
+
+
+static void context_state_cb(pa_context *pc, void *user_data)
+{
+    srs_pulse_t *p = (srs_pulse_t *)user_data;
+
+    switch (pa_context_get_state(pc)) {
+    case PA_CONTEXT_CONNECTING:
+        mrp_debug("pulse: connection being established...");
+        p->connected = FALSE;
+        stop_reconnect(p);
+        break;
+
+    case PA_CONTEXT_AUTHORIZING:
+        mrp_debug("pulse: connection being authenticated...");
+        p->connected = FALSE;
+        break;
+
+    case PA_CONTEXT_SETTING_NAME:
+        mrp_debug("pulse: setting connection name...");
+        p->connected = FALSE;
+        break;
+
+    case PA_CONTEXT_READY:
+        mrp_log_info("pulse: connection up and ready");
+        p->connected = TRUE;
+        break;
+
+    case PA_CONTEXT_TERMINATED:
+        mrp_log_info("pulse: connection terminated");
+        p->connected = FALSE;
+        start_reconnect(p);
+        break;
+
+    case PA_CONTEXT_FAILED:
+        mrp_log_error("pulse: connetion failed");
+    default:
+        p->connected = FALSE;
+        start_reconnect(p);
+        break;
+    }
+}
+
+
+static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
+                             uint32_t idx, void *user_data)
+{
+    srs_pulse_t *p = (srs_pulse_t *)user_data;
+
+    MRP_UNUSED(pc);
+    MRP_UNUSED(e);
+    MRP_UNUSED(idx);
+    MRP_UNUSED(user_data);
+
+    MRP_UNUSED(p);
+
+    return;
+}
+
+
+static void stream_notify(stream_t *s, srs_voice_event_type_t event)
+{
+    int                mask = (1 << event);
+    srs_voice_event_t  e;
+
+    if (s->cb == NULL || !(s->event_mask & mask))
+        return;
+
+    if ((mask & SRS_STREAM_MASK_ONESHOT) && (s->fired_mask & mask))
+        return;
+
+    e.type = event;
+    e.id   = s->id;
+
+    switch (event) {
+    case SRS_STREAM_EVENT_STARTED:
+        e.data.progress.pcnt = 0;
+        e.data.progress.msec = 0;
+        break;
+
+    case SRS_STREAM_EVENT_PROGRESS:
+        e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
+        e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
+        break;
+
+    case SRS_STREAM_EVENT_COMPLETED:
+        e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
+        e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
+        break;
+
+    case SRS_STREAM_EVENT_ABORTED:
+        e.data.progress.pcnt = 0;
+        e.data.progress.msec = 0;
+        break;
+
+    default:
+        return;
+    }
+
+    stream_ref(s);
+    s->cb(s->p, &e, s->user_data);
+    stream_unref(s);
+}
+
+
+static void stream_state_cb(pa_stream *ps, void *user_data)
+{
+    stream_t           *s   = (stream_t *)user_data;
+    srs_pulse_t        *p   = s->p;
+    pa_context_state_t  cst = pa_context_get_state(p->pc);
+    pa_stream_state_t   sst;
+
+    if (cst == PA_CONTEXT_TERMINATED || cst == PA_CONTEXT_FAILED)
+        return;
+
+    stream_ref(s);
+
+    switch ((sst = pa_stream_get_state(s->s))) {
+    case PA_STREAM_CREATING:
+        mrp_debug("pulse: stream #%u being created", s->id);
+        break;
+
+    case PA_STREAM_READY:
+        mrp_debug("pulse: stream #%u ready", s->id);
+        stream_notify(s, SRS_STREAM_EVENT_STARTED);
+        break;
+
+    case PA_STREAM_TERMINATED:
+    case PA_STREAM_FAILED:
+    default:
+        mrp_debug("pulse: stream #%u state %d", s->id, sst);
+
+        pa_stream_disconnect(s->s);
+        pa_stream_set_state_callback(s->s, NULL, NULL);
+        pa_stream_set_write_callback(s->s, NULL, NULL);
+
+        if (sst == PA_STREAM_TERMINATED)
+            stream_notify(s, SRS_STREAM_EVENT_COMPLETED);
+        else
+            stream_notify(s, SRS_STREAM_EVENT_ABORTED);
+    }
+
+    stream_unref(s);
+}
+
+
+static void stream_drain(stream_t *s)
+{
+    if (s->drain == NULL) {
+        mrp_debug("pulse: stream #%u done, draining", s->id);
+        stream_ref(s);
+        s->drain = pa_stream_drain(s->s, stream_drain_cb, s);
+    }
+}
+
+
+static void stream_drain_cb(pa_stream *ps, int success, void *user_data)
+{
+    stream_t *s = (stream_t *)user_data;
+
+    mrp_debug("pulse: stream #%u drained %s", s->id,
+              success ? "successfully" : "failed");
+
+    pa_operation_unref(s->drain);
+    s->drain = NULL;
+    stream_notify(s, SRS_STREAM_EVENT_COMPLETED);
+    stream_unref(s);
+}
+
+
+static void stream_write_cb(pa_stream *ps, size_t size, void *user_data)
+{
+    stream_t *s = (stream_t *)user_data;
+    int       done;
+
+    stream_notify(s, SRS_STREAM_EVENT_PROGRESS);
+
+    if (s->offs == s->size) {
+        pa_stream_set_write_callback(s->s, NULL, NULL);
+        return;
+    }
+
+    stream_ref(s);
+
+    if (s->offs + size >= s->size) {
+        size = s->size - s->offs;
+        done = TRUE;
+    }
+    else
+        done = FALSE;
+
+    if (pa_stream_write(s->s, s->buf + s->offs, size, NULL, 0,
+                        PA_SEEK_RELATIVE) < 0) {
+        mrp_log_error("pulse: failed to write %zd bytes to stream #%u",
+                      size, s->id);
+        goto out;
+    }
+    else {
+        s->offs += size;
+
+        if (done)
+            stream_stop(s, TRUE, TRUE);
+    }
+
+ out:
+    stream_unref(s);
+}
diff --git a/src/daemon/pulse.h b/src/daemon/pulse.h
new file mode 100644 (file)
index 0000000..88d2d61
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2012-2014, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Intel Corporation nor the names of its contributors
+ *     may be used to endorse or promote products derived from this software
+ *     without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __SRS_DAEMON_PULSE_H__
+#define __SRS_DAEMON_PULSE_H__
+
+#include <stdint.h>
+#include <murphy/common/macros.h>
+#include <pulse/pulseaudio.h>
+
+#include "srs/daemon/voice.h"
+
+MRP_CDECL_BEGIN
+
+/*
+ * PA stream events
+ */
+
+typedef enum {
+    SRS_STREAM_EVENT_NONE      = SRS_VOICE_EVENT_STARTED - 1,
+    SRS_STREAM_EVENT_STARTED   = SRS_VOICE_EVENT_STARTED,
+    SRS_STREAM_EVENT_PROGRESS  = SRS_VOICE_EVENT_PROGRESS,
+    SRS_STREAM_EVENT_COMPLETED = SRS_VOICE_EVENT_COMPLETED,
+    SRS_STREAM_EVENT_TIMEOUT   = SRS_VOICE_EVENT_TIMEOUT,
+    SRS_STREAM_EVENT_ABORTED   = SRS_VOICE_EVENT_ABORTED,
+    SRS_STREAM_EVENT_CORKED,
+    SRS_STREAM_EVENT_UNCORKED,
+} srs_stream_event_type_t;
+
+#define SRS_STREAM_MASK_NONE      SRS_VOICE_MASK_NONE
+#define SRS_STREAM_MASK_STARTED   SRS_VOICE_MASK_STARTED
+#define SRS_STREAM_MASK_PROGRESS  SRS_VOICE_MASK_PROGRESS
+#define SRS_STREAM_MASK_COMPLETED SRS_VOICE_MASK_COMPLETED
+#define SRS_STREAM_MASK_ABORTED   SRS_VOICE_MASK_ABORTED
+#define SRS_STREAM_MASK_CORKED    (1 << SRS_STREAM_EVENT_CORKED)
+#define SRS_STREAM_MASK_UNCORKED  (1 << SRS_STREAM_EVENT_UNCORKED)
+#define SRS_STREAM_MASK_ONESHOT   (~(SRS_STREAM_EVENT_PROGRESS))
+#define SRS_STREAM_MASK_ALL       (SRS_STREAM_MASK_STARTED   | \
+                                   SRS_STREAM_MASK_PROGRESS  | \
+                                   SRS_STREAM_MASK_COMPLETED | \
+                                   SRS_STREAM_MASK_ABORTED)
+
+typedef srs_voice_event_t srs_stream_event_t;
+
+typedef void (*srs_stream_cb_t)(srs_pulse_t *p, srs_stream_event_t *event,
+                                void *user_data);
+
+/** Set up the PulseAudio interface. */
+srs_pulse_t *srs_pulse_setup(pa_mainloop_api *pa, const char *name);
+
+/** Clean up the audio interface. */
+void srs_pulse_cleanup(srs_pulse_t *p);
+
+/** Render an stream (a buffer of audio samples). */
+uint32_t srs_play_stream(srs_pulse_t *p, void *sample_buf, int sample_rate,
+                         int nchannel, uint32_t nsample, char **tags,
+                         int event_mask, srs_stream_cb_t cb,
+                         void *user_data);
+
+/** Stop an ongoing stream. */
+int srs_stop_stream(srs_pulse_t *p, uint32_t id, int drain, int notify);
+
+MRP_CDECL_END
+
+#endif /* __SRS_DAEMON_PULSE_H__ */