be6dd5d58e506e8aea4c60eb6e999b11d4202e8d
[profile/ivi/speech-recognition.git] / src / plugins / text-to-speech / festival / pulse.c
1 #include <errno.h>
2
3 #include <murphy/common/debug.h>
4 #include <murphy/common/log.h>
5 #include <murphy/common/mm.h>
6 #include <murphy/common/list.h>
7 #include <murphy/common/refcnt.h>
8
9 #include "festival-voice.h"
10 #include "pulse.h"
11
12 #define SPEECH "speech"
13 #define TTS    "text-to-speech"
14
15 typedef struct {
16     festival_t      *f;                  /* festival voice context */
17     pa_mainloop_api *pa;                 /* PA mainloop API */
18     pa_context      *pc;                 /* PA context */
19     uint32_t         strmid;             /* next stream id */
20     mrp_list_hook_t  streams;            /* active streams */
21     int              connected;          /* whether connection is up */
22     mrp_timer_t     *reconn;             /* reconnect timer */
23 } pulse_t;
24
25
26 typedef struct {
27     pulse_t           *p;                /* our pulse_t context */
28     pa_stream         *s;                /* associated PA stream */
29     void              *buf;              /* pre-generated sample buffer */
30     size_t             size;             /* buffer size */
31     size_t             offs;             /* offset to next sample */
32     uint32_t           msec;             /* length in milliseconds */
33     int                rate;             /* sample rate */
34     int                nchannel;         /* number of channels */
35     uint32_t           nsample;          /* number of samples */
36     uint32_t           id;               /* our stream id */
37     int                event_mask;       /* mask of watched events */
38     int                fired_mask;       /* mask of delivered events */
39     pulse_stream_cb_t  cb;               /* notification callback */
40     void              *user_data;        /* callback user data */
41     mrp_list_hook_t    hook;             /* hook to list of streams */
42     mrp_refcnt_t       refcnt;           /* reference count */
43     int                stopped : 1;      /* stopped marker */
44     pa_operation      *drain;            /* draining operation */
45 } stream_t;
46
47
48 static void context_state_cb(pa_context *pc, void *user_data);
49 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
50                              uint32_t idx, void *user_data);
51 static void stream_state_cb(pa_stream *s, void *user_data);
52 static void stream_write_cb(pa_stream *s, size_t size, void *user_data);
53 static void stream_drain_cb(pa_stream *ps, int success, void *user_data);
54
55 static void stream_drain(stream_t *s);
56 static void stream_notify(stream_t *s, srs_voice_event_type_t event);
57
58
59 int pulse_setup(festival_t *f)
60 {
61     pulse_t *p;
62
63     if ((p = mrp_allocz(sizeof(*p))) == NULL)
64         return -1;
65
66     mrp_list_init(&p->streams);
67     p->f  = f;
68     p->pa = f->srs->pa;
69     p->pc = pa_context_new(p->pa, "festival");
70
71     if (p->pc == NULL) {
72         mrp_free(p);
73
74         return -1;
75     }
76
77     f->pulse  = p;
78     p->strmid = 1;
79
80     pa_context_set_state_callback(p->pc, context_state_cb, p);
81     pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
82     pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
83
84     return 0;
85 }
86
87
88 void pulse_cleanup(festival_t *f)
89 {
90     pulse_t *p = (pulse_t *)f->pulse;
91
92     if (p->pc != NULL) {
93         pa_context_disconnect(p->pc);
94         p->pc = NULL;
95     }
96 }
97
98
99 static void stream_destroy(stream_t *s)
100 {
101     mrp_debug("destroying stream #%d", s->id);
102
103     mrp_list_delete(&s->hook);
104
105     if (s->s != NULL) {
106         pa_stream_set_state_callback(s->s, NULL, NULL);
107         pa_stream_set_write_callback(s->s, NULL, NULL);
108         pa_stream_disconnect(s->s);
109         pa_stream_unref(s->s);
110         s->s = NULL;
111         mrp_free(s->buf);
112         s->buf = NULL;
113     }
114
115     mrp_free(s);
116 }
117
118
119 static inline stream_t *stream_ref(stream_t *s)
120 {
121     if (s != NULL) {
122         mrp_ref_obj(s, refcnt);
123         mrp_debug("stream reference count increased to %d", s->refcnt);
124     }
125
126     return s;
127 }
128
129
130 static inline void stream_unref(stream_t *s)
131 {
132     if (mrp_unref_obj(s, refcnt))
133         stream_destroy(s);
134     else
135         mrp_debug("stream reference count decreased to %d", s->refcnt);
136 }
137
138
139 uint32_t pulse_play_stream(festival_t *f, void *sample_buf, int sample_rate,
140                            int nchannel, uint32_t nsample, char **tags,
141                            int event_mask, pulse_stream_cb_t cb,
142                            void *user_data)
143 {
144     pulse_t         *p    = (pulse_t *)f->pulse;
145     char           **t;
146     stream_t        *s;
147     pa_sample_spec   ss;
148     pa_buffer_attr   ba;
149     pa_proplist     *props;
150     size_t           pamin, pabuf;
151     int              flags;
152
153     if ((s = mrp_allocz(sizeof(*s))) == NULL)
154         return 0;
155
156     mrp_list_init(&s->hook);
157     mrp_refcnt_init(&s->refcnt);
158
159     if (tags != NULL) {
160         if ((props = pa_proplist_new()) == NULL) {
161             mrp_free(s);
162             return 0;
163         }
164
165         pa_proplist_sets(props, PA_PROP_MEDIA_ROLE, SPEECH);
166
167         for (t = tags; *t; t++)
168             pa_proplist_setp(props, *t);
169     }
170     else
171         props = NULL;
172
173     memset(&ss, 0, sizeof(ss));
174     ss.format   = PA_SAMPLE_S16LE;
175     ss.rate     = sample_rate;
176     ss.channels = nchannel;
177
178     pamin  = pa_usec_to_bytes(100 * PA_USEC_PER_MSEC, &ss);
179     pabuf  = pa_usec_to_bytes(300 * PA_USEC_PER_MSEC, &ss);
180
181     ba.maxlength = -1;
182     ba.tlength   = pabuf;
183     ba.minreq    = pamin;
184     ba.prebuf    = pabuf;
185     ba.fragsize  = -1;
186
187     s->s = pa_stream_new_with_proplist(p->pc, TTS, &ss, NULL, props);
188     if (props != NULL)
189         pa_proplist_free(props);
190
191     if (s->s == NULL) {
192         mrp_free(s);
193         return 0;
194     }
195
196     s->p          = p;
197     s->buf        = sample_buf;
198     s->offs       = 0;
199     s->rate       = sample_rate;
200     s->nchannel   = nchannel;
201     s->nsample    = nsample;
202     s->size       = 2 * nsample * nchannel;
203     s->msec       = (1.0 * nsample) / sample_rate * 1000;
204     s->cb         = cb;
205     s->user_data  = user_data;
206     s->id         = p->strmid++;
207     s->event_mask = event_mask;
208
209     pa_stream_set_state_callback(s->s, stream_state_cb, s);
210     pa_stream_set_write_callback(s->s, stream_write_cb, s);
211
212     flags = PA_STREAM_ADJUST_LATENCY;
213     pa_stream_connect_playback(s->s, NULL, &ba, flags, NULL, NULL);
214
215     mrp_list_append(&p->streams, &s->hook);
216
217     return s->id;
218 }
219
220
221 static void stream_stop(stream_t *s, int drain, int notify)
222 {
223     if (s->stopped)
224         return;
225     else
226         s->stopped = TRUE;
227
228     if (!notify)
229         s->event_mask = 0;
230
231     if (!drain) {
232         stream_notify(s, s->offs >= s->size ?
233                       PULSE_STREAM_COMPLETED : PULSE_STREAM_ABORTED);
234     }
235     else
236         stream_drain(s);
237
238     stream_unref(s);                     /* remove intial reference */
239 }
240
241
242 int pulse_stop_stream(festival_t *f, uint32_t id, int drain, int notify)
243 {
244     pulse_t         *p = (pulse_t *)f->pulse;
245     mrp_list_hook_t *sp, *sn;
246     stream_t        *se, *s;
247
248     mrp_debug("stopping stream #%u", id);
249
250     s = NULL;
251     mrp_list_foreach(&p->streams, sp, sn) {
252         se = mrp_list_entry(sp, typeof(*se), hook);
253
254         if (se->id == id) {
255             s = se;
256             break;
257         }
258     }
259
260     if (s == NULL) {
261         errno = ENOENT;
262         return -1;
263     }
264
265     stream_stop(s, drain, notify);
266
267     return 0;
268 }
269
270
271 static void connect_timer_cb(mrp_timer_t *t, void *user_data)
272 {
273     pulse_t *p = (pulse_t *)user_data;
274
275     if (p->pc != NULL) {
276         pa_context_unref(p->pc);
277         p->pc = NULL;
278     }
279
280     p->pc = pa_context_new(p->pa, "festival");
281
282     pa_context_set_state_callback(p->pc, context_state_cb, p);
283     pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
284     pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
285
286     p->reconn = NULL;
287     mrp_del_timer(t);
288 }
289
290
291 static void stop_reconnect(pulse_t *p)
292 {
293     if (p->reconn != NULL) {
294         mrp_del_timer(p->reconn);
295         p->reconn = NULL;
296     }
297 }
298
299
300 static void start_reconnect(pulse_t *p)
301 {
302     stop_reconnect(p);
303
304     p->reconn = mrp_add_timer(p->f->srs->ml, 5000, connect_timer_cb, p);
305 }
306
307
308 static void context_state_cb(pa_context *pc, void *user_data)
309 {
310     pulse_t *p = (pulse_t *)user_data;
311
312     switch (pa_context_get_state(pc)) {
313     case PA_CONTEXT_CONNECTING:
314         mrp_debug("PA connection: being established...");
315         p->connected = FALSE;
316         stop_reconnect(p);
317         break;
318
319     case PA_CONTEXT_AUTHORIZING:
320         mrp_debug("PA connection: being authenticated...");
321         p->connected = FALSE;
322         break;
323
324     case PA_CONTEXT_SETTING_NAME:
325         mrp_debug("PA connection: setting name...");
326         p->connected = FALSE;
327         break;
328
329     case PA_CONTEXT_READY:
330         mrp_log_info("festival: PA connection up and ready");
331         p->connected = TRUE;
332         break;
333
334     case PA_CONTEXT_TERMINATED:
335         mrp_log_info("festival: PA connection terminated");
336         p->connected = FALSE;
337         start_reconnect(p);
338         break;
339
340     case PA_CONTEXT_FAILED:
341         mrp_log_error("festival: PA connetion failed");
342     default:
343         p->connected = FALSE;
344         start_reconnect(p);
345         break;
346     }
347 }
348
349
350 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
351                              uint32_t idx, void *user_data)
352 {
353     pulse_t *p = (pulse_t *)user_data;
354
355     MRP_UNUSED(pc);
356     MRP_UNUSED(e);
357     MRP_UNUSED(idx);
358     MRP_UNUSED(user_data);
359
360     MRP_UNUSED(p);
361
362     return;
363 }
364
365
366 static void stream_notify(stream_t *s, srs_voice_event_type_t event)
367 {
368     int                mask = (1 << event);
369     srs_voice_event_t  e;
370
371     if (s->cb == NULL || !(s->event_mask & mask))
372         return;
373
374     if ((mask & PULSE_MASK_ONESHOT) && (s->fired_mask & mask))
375         return;
376
377     e.type = event;
378     e.id   = s->id;
379
380     switch (event) {
381     case PULSE_STREAM_STARTED:
382         e.data.progress.pcnt = 0;
383         e.data.progress.msec = 0;
384         break;
385
386     case PULSE_STREAM_PROGRESS:
387         e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
388         e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
389         break;
390
391     case PULSE_STREAM_COMPLETED:
392         e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
393         e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
394         break;
395
396     case PULSE_STREAM_ABORTED:
397         e.data.progress.pcnt = 0;
398         e.data.progress.msec = 0;
399         break;
400
401     default:
402         return;
403     }
404
405     stream_ref(s);
406     s->cb(s->p->f, &e, s->user_data);
407     stream_unref(s);
408 }
409
410
411 static void stream_state_cb(pa_stream *ps, void *user_data)
412 {
413     stream_t           *s   = (stream_t *)user_data;
414     pulse_t            *p   = s->p;
415     pa_context_state_t  cst = pa_context_get_state(p->pc);
416     pa_stream_state_t   sst;
417
418     if (cst == PA_CONTEXT_TERMINATED || cst == PA_CONTEXT_FAILED)
419         return;
420
421     stream_ref(s);
422
423     switch ((sst = pa_stream_get_state(s->s))) {
424     case PA_STREAM_CREATING:
425         mrp_debug("stream #%u being created", s->id);
426         break;
427
428     case PA_STREAM_READY:
429         mrp_debug("stream #%u ready", s->id);
430         stream_notify(s, PULSE_STREAM_STARTED);
431         break;
432
433     case PA_STREAM_TERMINATED:
434     case PA_STREAM_FAILED:
435     default:
436         mrp_debug("stream #%u state %d", s->id, sst);
437
438         pa_stream_disconnect(s->s);
439         pa_stream_set_state_callback(s->s, NULL, NULL);
440         pa_stream_set_write_callback(s->s, NULL, NULL);
441
442         if (sst == PA_STREAM_TERMINATED)
443             stream_notify(s, PULSE_STREAM_COMPLETED);
444         else
445             stream_notify(s, PULSE_STREAM_ABORTED);
446     }
447
448     stream_unref(s);
449 }
450
451
452 static void stream_drain(stream_t *s)
453 {
454     if (s->drain == NULL) {
455         mrp_debug("stream #%u done, draining", s->id);
456         stream_ref(s);
457         s->drain = pa_stream_drain(s->s, stream_drain_cb, s);
458     }
459 }
460
461
462 static void stream_drain_cb(pa_stream *ps, int success, void *user_data)
463 {
464     stream_t *s = (stream_t *)user_data;
465
466     mrp_debug("stream #%u drained %s", s->id,
467               success ? "successfully" : "failed");
468
469     pa_operation_unref(s->drain);
470     s->drain = NULL;
471     stream_notify(s, PULSE_STREAM_COMPLETED);
472     stream_unref(s);
473 }
474
475
476 static void stream_write_cb(pa_stream *ps, size_t size, void *user_data)
477 {
478     stream_t *s = (stream_t *)user_data;
479     int       done;
480
481     stream_notify(s, PULSE_STREAM_PROGRESS);
482
483     if (s->offs == s->size) {
484         pa_stream_set_write_callback(s->s, NULL, NULL);
485         return;
486     }
487
488     stream_ref(s);
489
490     if (s->offs + size >= s->size) {
491         size = s->size - s->offs;
492         done = TRUE;
493     }
494     else
495         done = FALSE;
496
497     if (pa_stream_write(s->s, s->buf + s->offs, size, NULL, 0,
498                         PA_SEEK_RELATIVE) < 0) {
499         mrp_log_error("festival: failed to write %zd bytes", size);
500         goto out;
501     }
502     else {
503         s->offs += size;
504
505         if (done)
506             stream_stop(s, TRUE, TRUE);
507     }
508
509  out:
510     stream_unref(s);
511 }