3 #include <murphy/common/debug.h>
4 #include <murphy/common/log.h>
5 #include <murphy/common/mm.h>
6 #include <murphy/common/list.h>
7 #include <murphy/common/refcnt.h>
11 #define SPEECH "speech"
12 #define TTS "text-to-speech"
15 pa_mainloop_api *pa; /* PA mainloop API */
16 char *name; /* PA context name */
17 pa_context *pc; /* PA context */
18 uint32_t strmid; /* next stream id */
19 mrp_list_hook_t streams; /* active streams */
20 int connected; /* whether connection is up */
21 pa_time_event *reconn; /* reconnect timer */
26 pulse_t *p; /* our pulse_t context */
27 pa_stream *s; /* associated PA stream */
28 void *buf; /* pre-generated sample buffer */
29 size_t size; /* buffer size */
30 size_t offs; /* offset to next sample */
31 uint32_t msec; /* length in milliseconds */
32 int rate; /* sample rate */
33 int nchannel; /* number of channels */
34 uint32_t nsample; /* number of samples */
35 uint32_t id; /* our stream id */
36 int event_mask; /* mask of watched events */
37 int fired_mask; /* mask of delivered events */
38 pulse_stream_cb_t cb; /* notification callback */
39 void *user_data; /* callback user data */
40 mrp_list_hook_t hook; /* hook to list of streams */
41 mrp_refcnt_t refcnt; /* reference count */
42 int stopped : 1; /* stopped marker */
43 pa_operation *drain; /* draining operation */
47 static void context_state_cb(pa_context *pc, void *user_data);
48 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
49 uint32_t idx, void *user_data);
50 static void stream_state_cb(pa_stream *s, void *user_data);
51 static void stream_write_cb(pa_stream *s, size_t size, void *user_data);
52 static void stream_drain_cb(pa_stream *ps, int success, void *user_data);
54 static void stream_drain(stream_t *s);
55 static void stream_notify(stream_t *s, srs_voice_event_type_t event);
58 pulse_t *pulse_setup(pa_mainloop_api *pa, const char *name)
62 if ((p = mrp_allocz(sizeof(*p))) == NULL)
65 mrp_list_init(&p->streams);
67 p->name = name ? mrp_strdup(name) : mrp_strdup("Winthorpe");
68 p->pc = pa_context_new(p->pa, p->name);
78 pa_context_set_state_callback(p->pc, context_state_cb, p);
79 pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
80 pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
86 void pulse_cleanup(pulse_t *p)
89 pa_context_disconnect(p->pc);
98 static void stream_destroy(stream_t *s)
100 mrp_debug("destroying stream #%d", s->id);
102 mrp_list_delete(&s->hook);
105 pa_stream_set_state_callback(s->s, NULL, NULL);
106 pa_stream_set_write_callback(s->s, NULL, NULL);
107 pa_stream_disconnect(s->s);
108 pa_stream_unref(s->s);
118 static inline stream_t *stream_ref(stream_t *s)
121 mrp_ref_obj(s, refcnt);
122 mrp_debug("stream reference count increased to %d", s->refcnt);
129 static inline void stream_unref(stream_t *s)
131 if (mrp_unref_obj(s, refcnt))
134 mrp_debug("stream reference count decreased to %d", s->refcnt);
138 uint32_t pulse_play_stream(pulse_t *p, void *sample_buf, int sample_rate,
139 int nchannel, uint32_t nsample, char **tags,
140 int event_mask, pulse_stream_cb_t cb,
151 if ((s = mrp_allocz(sizeof(*s))) == NULL)
154 mrp_list_init(&s->hook);
155 mrp_refcnt_init(&s->refcnt);
158 if ((props = pa_proplist_new()) == NULL) {
163 pa_proplist_sets(props, PA_PROP_MEDIA_ROLE, SPEECH);
165 for (t = tags; *t; t++)
166 pa_proplist_setp(props, *t);
171 memset(&ss, 0, sizeof(ss));
172 ss.format = PA_SAMPLE_S16LE;
173 ss.rate = sample_rate;
174 ss.channels = nchannel;
176 pamin = pa_usec_to_bytes(100 * PA_USEC_PER_MSEC, &ss);
177 pabuf = pa_usec_to_bytes(300 * PA_USEC_PER_MSEC, &ss);
185 s->s = pa_stream_new_with_proplist(p->pc, TTS, &ss, NULL, props);
187 pa_proplist_free(props);
197 s->rate = sample_rate;
198 s->nchannel = nchannel;
199 s->nsample = nsample;
200 s->size = 2 * nsample * nchannel;
201 s->msec = (1.0 * nsample) / sample_rate * 1000;
203 s->user_data = user_data;
205 s->event_mask = event_mask;
207 pa_stream_set_state_callback(s->s, stream_state_cb, s);
208 pa_stream_set_write_callback(s->s, stream_write_cb, s);
210 flags = PA_STREAM_ADJUST_LATENCY;
211 pa_stream_connect_playback(s->s, NULL, &ba, flags, NULL, NULL);
213 mrp_list_append(&p->streams, &s->hook);
219 static void stream_stop(stream_t *s, int drain, int notify)
230 stream_notify(s, s->offs >= s->size ?
231 PULSE_STREAM_COMPLETED : PULSE_STREAM_ABORTED);
236 stream_unref(s); /* remove intial reference */
240 int pulse_stop_stream(pulse_t *p, uint32_t id, int drain, int notify)
242 mrp_list_hook_t *sp, *sn;
245 mrp_debug("stopping stream #%u", id);
248 mrp_list_foreach(&p->streams, sp, sn) {
249 se = mrp_list_entry(sp, typeof(*se), hook);
262 stream_stop(s, drain, notify);
268 static void connect_timer_cb(pa_mainloop_api *api, pa_time_event *e,
269 const struct timeval *tv, void *user_data)
271 pulse_t *p = (pulse_t *)user_data;
274 pa_context_unref(p->pc);
278 p->pc = pa_context_new(p->pa, p->name);
280 pa_context_set_state_callback(p->pc, context_state_cb, p);
281 pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
282 pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
284 p->pa->time_free(p->reconn);
289 static void stop_reconnect(pulse_t *p)
291 if (p->reconn != NULL) {
292 p->pa->time_free(p->reconn);
298 static void start_reconnect(pulse_t *p)
304 pa_timeval_add(pa_gettimeofday(&tv), 5000);
306 p->reconn = p->pa->time_new(p->pa, &tv, connect_timer_cb, p);
310 static void context_state_cb(pa_context *pc, void *user_data)
312 pulse_t *p = (pulse_t *)user_data;
314 switch (pa_context_get_state(pc)) {
315 case PA_CONTEXT_CONNECTING:
316 mrp_debug("pulse: connection being established...");
317 p->connected = FALSE;
321 case PA_CONTEXT_AUTHORIZING:
322 mrp_debug("pulse: connection being authenticated...");
323 p->connected = FALSE;
326 case PA_CONTEXT_SETTING_NAME:
327 mrp_debug("pulse: setting connection name...");
328 p->connected = FALSE;
331 case PA_CONTEXT_READY:
332 mrp_log_info("pulse: connection up and ready");
336 case PA_CONTEXT_TERMINATED:
337 mrp_log_info("pulse: connection terminated");
338 p->connected = FALSE;
342 case PA_CONTEXT_FAILED:
343 mrp_log_error("pulse: connetion failed");
345 p->connected = FALSE;
352 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
353 uint32_t idx, void *user_data)
355 pulse_t *p = (pulse_t *)user_data;
360 MRP_UNUSED(user_data);
368 static void stream_notify(stream_t *s, srs_voice_event_type_t event)
370 int mask = (1 << event);
373 if (s->cb == NULL || !(s->event_mask & mask))
376 if ((mask & PULSE_MASK_ONESHOT) && (s->fired_mask & mask))
383 case PULSE_STREAM_STARTED:
384 e.data.progress.pcnt = 0;
385 e.data.progress.msec = 0;
388 case PULSE_STREAM_PROGRESS:
389 e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
390 e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
393 case PULSE_STREAM_COMPLETED:
394 e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
395 e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
398 case PULSE_STREAM_ABORTED:
399 e.data.progress.pcnt = 0;
400 e.data.progress.msec = 0;
408 s->cb(s->p, &e, s->user_data);
413 static void stream_state_cb(pa_stream *ps, void *user_data)
415 stream_t *s = (stream_t *)user_data;
417 pa_context_state_t cst = pa_context_get_state(p->pc);
418 pa_stream_state_t sst;
420 if (cst == PA_CONTEXT_TERMINATED || cst == PA_CONTEXT_FAILED)
425 switch ((sst = pa_stream_get_state(s->s))) {
426 case PA_STREAM_CREATING:
427 mrp_debug("pulse: stream #%u being created", s->id);
430 case PA_STREAM_READY:
431 mrp_debug("pulse: stream #%u ready", s->id);
432 stream_notify(s, PULSE_STREAM_STARTED);
435 case PA_STREAM_TERMINATED:
436 case PA_STREAM_FAILED:
438 mrp_debug("pulse: stream #%u state %d", s->id, sst);
440 pa_stream_disconnect(s->s);
441 pa_stream_set_state_callback(s->s, NULL, NULL);
442 pa_stream_set_write_callback(s->s, NULL, NULL);
444 if (sst == PA_STREAM_TERMINATED)
445 stream_notify(s, PULSE_STREAM_COMPLETED);
447 stream_notify(s, PULSE_STREAM_ABORTED);
454 static void stream_drain(stream_t *s)
456 if (s->drain == NULL) {
457 mrp_debug("pulse: stream #%u done, draining", s->id);
459 s->drain = pa_stream_drain(s->s, stream_drain_cb, s);
464 static void stream_drain_cb(pa_stream *ps, int success, void *user_data)
466 stream_t *s = (stream_t *)user_data;
468 mrp_debug("pulse: stream #%u drained %s", s->id,
469 success ? "successfully" : "failed");
471 pa_operation_unref(s->drain);
473 stream_notify(s, PULSE_STREAM_COMPLETED);
478 static void stream_write_cb(pa_stream *ps, size_t size, void *user_data)
480 stream_t *s = (stream_t *)user_data;
483 stream_notify(s, PULSE_STREAM_PROGRESS);
485 if (s->offs == s->size) {
486 pa_stream_set_write_callback(s->s, NULL, NULL);
492 if (s->offs + size >= s->size) {
493 size = s->size - s->offs;
499 if (pa_stream_write(s->s, s->buf + s->offs, size, NULL, 0,
500 PA_SEEK_RELATIVE) < 0) {
501 mrp_log_error("pulse: failed to write %zd bytes to stream #%u",
509 stream_stop(s, TRUE, TRUE);