3 #include <murphy/common/debug.h>
4 #include <murphy/common/log.h>
5 #include <murphy/common/mm.h>
6 #include <murphy/common/list.h>
7 #include <murphy/common/refcnt.h>
9 #include "festival-voice.h"
12 #define SPEECH "speech"
13 #define TTS "text-to-speech"
16 festival_t *f; /* festival voice context */
17 pa_mainloop_api *pa; /* PA mainloop API */
18 pa_context *pc; /* PA context */
19 uint32_t strmid; /* next stream id */
20 mrp_list_hook_t streams; /* active streams */
21 int connected; /* whether connection is up */
22 mrp_timer_t *reconn; /* reconnect timer */
27 pulse_t *p; /* our pulse_t context */
28 pa_stream *s; /* associated PA stream */
29 void *buf; /* pre-generated sample buffer */
30 size_t size; /* buffer size */
31 size_t offs; /* offset to next sample */
32 uint32_t msec; /* length in milliseconds */
33 int rate; /* sample rate */
34 int nchannel; /* number of channels */
35 uint32_t nsample; /* number of samples */
36 uint32_t id; /* our stream id */
37 int event_mask; /* mask of watched events */
38 int fired_mask; /* mask of delivered events */
39 pulse_stream_cb_t cb; /* notification callback */
40 void *user_data; /* callback user data */
41 mrp_list_hook_t hook; /* hook to list of streams */
42 mrp_refcnt_t refcnt; /* reference count */
43 int stopped : 1; /* stopped marker */
44 pa_operation *drain; /* draining operation */
48 static void context_state_cb(pa_context *pc, void *user_data);
49 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
50 uint32_t idx, void *user_data);
51 static void stream_state_cb(pa_stream *s, void *user_data);
52 static void stream_write_cb(pa_stream *s, size_t size, void *user_data);
53 static void stream_drain_cb(pa_stream *ps, int success, void *user_data);
55 static void stream_drain(stream_t *s);
56 static void stream_notify(stream_t *s, srs_voice_event_type_t event);
59 int pulse_setup(festival_t *f)
63 if ((p = mrp_allocz(sizeof(*p))) == NULL)
66 mrp_list_init(&p->streams);
69 p->pc = pa_context_new(p->pa, "festival");
80 pa_context_set_state_callback(p->pc, context_state_cb, p);
81 pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
82 pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
88 void pulse_cleanup(festival_t *f)
90 pulse_t *p = (pulse_t *)f->pulse;
93 pa_context_disconnect(p->pc);
99 static void stream_destroy(stream_t *s)
101 mrp_debug("destroying stream #%d", s->id);
103 mrp_list_delete(&s->hook);
106 pa_stream_set_state_callback(s->s, NULL, NULL);
107 pa_stream_set_write_callback(s->s, NULL, NULL);
108 pa_stream_disconnect(s->s);
109 pa_stream_unref(s->s);
119 static inline stream_t *stream_ref(stream_t *s)
122 mrp_ref_obj(s, refcnt);
123 mrp_debug("stream reference count increased to %d", s->refcnt);
130 static inline void stream_unref(stream_t *s)
132 if (mrp_unref_obj(s, refcnt))
135 mrp_debug("stream reference count decreased to %d", s->refcnt);
139 uint32_t pulse_play_stream(festival_t *f, void *sample_buf, int sample_rate,
140 int nchannel, uint32_t nsample, char **tags,
141 int event_mask, pulse_stream_cb_t cb,
144 pulse_t *p = (pulse_t *)f->pulse;
153 if ((s = mrp_allocz(sizeof(*s))) == NULL)
156 mrp_list_init(&s->hook);
157 mrp_refcnt_init(&s->refcnt);
160 if ((props = pa_proplist_new()) == NULL) {
165 pa_proplist_sets(props, PA_PROP_MEDIA_ROLE, SPEECH);
167 for (t = tags; *t; t++)
168 pa_proplist_setp(props, *t);
173 memset(&ss, 0, sizeof(ss));
174 ss.format = PA_SAMPLE_S16LE;
175 ss.rate = sample_rate;
176 ss.channels = nchannel;
178 pamin = pa_usec_to_bytes(100 * PA_USEC_PER_MSEC, &ss);
179 pabuf = pa_usec_to_bytes(300 * PA_USEC_PER_MSEC, &ss);
187 s->s = pa_stream_new_with_proplist(p->pc, TTS, &ss, NULL, props);
189 pa_proplist_free(props);
199 s->rate = sample_rate;
200 s->nchannel = nchannel;
201 s->nsample = nsample;
202 s->size = 2 * nsample * nchannel;
203 s->msec = (1.0 * nsample) / sample_rate * 1000;
205 s->user_data = user_data;
207 s->event_mask = event_mask;
209 pa_stream_set_state_callback(s->s, stream_state_cb, s);
210 pa_stream_set_write_callback(s->s, stream_write_cb, s);
212 flags = PA_STREAM_ADJUST_LATENCY;
213 pa_stream_connect_playback(s->s, NULL, &ba, flags, NULL, NULL);
215 mrp_list_append(&p->streams, &s->hook);
221 static void stream_stop(stream_t *s, int drain, int notify)
232 stream_notify(s, s->offs >= s->size ?
233 PULSE_STREAM_COMPLETED : PULSE_STREAM_ABORTED);
238 stream_unref(s); /* remove intial reference */
242 int pulse_stop_stream(festival_t *f, uint32_t id, int drain, int notify)
244 pulse_t *p = (pulse_t *)f->pulse;
245 mrp_list_hook_t *sp, *sn;
248 mrp_debug("stopping stream #%u", id);
251 mrp_list_foreach(&p->streams, sp, sn) {
252 se = mrp_list_entry(sp, typeof(*se), hook);
265 stream_stop(s, drain, notify);
271 static void connect_timer_cb(mrp_timer_t *t, void *user_data)
273 pulse_t *p = (pulse_t *)user_data;
276 pa_context_unref(p->pc);
280 p->pc = pa_context_new(p->pa, "festival");
282 pa_context_set_state_callback(p->pc, context_state_cb, p);
283 pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
284 pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
291 static void stop_reconnect(pulse_t *p)
293 if (p->reconn != NULL) {
294 mrp_del_timer(p->reconn);
300 static void start_reconnect(pulse_t *p)
304 p->reconn = mrp_add_timer(p->f->srs->ml, 5000, connect_timer_cb, p);
308 static void context_state_cb(pa_context *pc, void *user_data)
310 pulse_t *p = (pulse_t *)user_data;
312 switch (pa_context_get_state(pc)) {
313 case PA_CONTEXT_CONNECTING:
314 mrp_debug("PA connection: being established...");
315 p->connected = FALSE;
319 case PA_CONTEXT_AUTHORIZING:
320 mrp_debug("PA connection: being authenticated...");
321 p->connected = FALSE;
324 case PA_CONTEXT_SETTING_NAME:
325 mrp_debug("PA connection: setting name...");
326 p->connected = FALSE;
329 case PA_CONTEXT_READY:
330 mrp_log_info("festival: PA connection up and ready");
334 case PA_CONTEXT_TERMINATED:
335 mrp_log_info("festival: PA connection terminated");
336 p->connected = FALSE;
340 case PA_CONTEXT_FAILED:
341 mrp_log_error("festival: PA connetion failed");
343 p->connected = FALSE;
350 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
351 uint32_t idx, void *user_data)
353 pulse_t *p = (pulse_t *)user_data;
358 MRP_UNUSED(user_data);
366 static void stream_notify(stream_t *s, srs_voice_event_type_t event)
368 int mask = (1 << event);
371 if (s->cb == NULL || !(s->event_mask & mask))
374 if ((mask & PULSE_MASK_ONESHOT) && (s->fired_mask & mask))
381 case PULSE_STREAM_STARTED:
382 e.data.progress.pcnt = 0;
383 e.data.progress.msec = 0;
386 case PULSE_STREAM_PROGRESS:
387 e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
388 e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
391 case PULSE_STREAM_COMPLETED:
392 e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
393 e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
396 case PULSE_STREAM_ABORTED:
397 e.data.progress.pcnt = 0;
398 e.data.progress.msec = 0;
406 s->cb(s->p->f, &e, s->user_data);
411 static void stream_state_cb(pa_stream *ps, void *user_data)
413 stream_t *s = (stream_t *)user_data;
415 pa_context_state_t cst = pa_context_get_state(p->pc);
416 pa_stream_state_t sst;
418 if (cst == PA_CONTEXT_TERMINATED || cst == PA_CONTEXT_FAILED)
423 switch ((sst = pa_stream_get_state(s->s))) {
424 case PA_STREAM_CREATING:
425 mrp_debug("stream #%u being created", s->id);
428 case PA_STREAM_READY:
429 mrp_debug("stream #%u ready", s->id);
430 stream_notify(s, PULSE_STREAM_STARTED);
433 case PA_STREAM_TERMINATED:
434 case PA_STREAM_FAILED:
436 mrp_debug("stream #%u state %d", s->id, sst);
438 pa_stream_disconnect(s->s);
439 pa_stream_set_state_callback(s->s, NULL, NULL);
440 pa_stream_set_write_callback(s->s, NULL, NULL);
442 if (sst == PA_STREAM_TERMINATED)
443 stream_notify(s, PULSE_STREAM_COMPLETED);
445 stream_notify(s, PULSE_STREAM_ABORTED);
452 static void stream_drain(stream_t *s)
454 if (s->drain == NULL) {
455 mrp_debug("stream #%u done, draining", s->id);
457 s->drain = pa_stream_drain(s->s, stream_drain_cb, s);
462 static void stream_drain_cb(pa_stream *ps, int success, void *user_data)
464 stream_t *s = (stream_t *)user_data;
466 mrp_debug("stream #%u drained %s", s->id,
467 success ? "successfully" : "failed");
469 pa_operation_unref(s->drain);
471 stream_notify(s, PULSE_STREAM_COMPLETED);
476 static void stream_write_cb(pa_stream *ps, size_t size, void *user_data)
478 stream_t *s = (stream_t *)user_data;
481 stream_notify(s, PULSE_STREAM_PROGRESS);
483 if (s->offs == s->size) {
484 pa_stream_set_write_callback(s->s, NULL, NULL);
490 if (s->offs + size >= s->size) {
491 size = s->size - s->offs;
497 if (pa_stream_write(s->s, s->buf + s->offs, size, NULL, 0,
498 PA_SEEK_RELATIVE) < 0) {
499 mrp_log_error("festival: failed to write %zd bytes", size);
506 stream_stop(s, TRUE, TRUE);