daemon: first shot at a basic common pulse interface.
[profile/ivi/speech-recognition.git] / src / plugins / text-to-speech / espeak / pulse.c
1 #include <errno.h>
2
3 #include <murphy/common/debug.h>
4 #include <murphy/common/log.h>
5 #include <murphy/common/mm.h>
6 #include <murphy/common/list.h>
7 #include <murphy/common/refcnt.h>
8
9 #include "pulse.h"
10
11 #define SPEECH "speech"
12 #define TTS    "text-to-speech"
13
14 struct pulse_s {
15     pa_mainloop_api *pa;                 /* PA mainloop API */
16     char            *name;               /* PA context name */
17     pa_context      *pc;                 /* PA context */
18     uint32_t         strmid;             /* next stream id */
19     mrp_list_hook_t  streams;            /* active streams */
20     int              connected;          /* whether connection is up */
21     pa_time_event   *reconn;             /* reconnect timer */
22 };
23
24
25 typedef struct {
26     pulse_t           *p;                /* our pulse_t context */
27     pa_stream         *s;                /* associated PA stream */
28     void              *buf;              /* pre-generated sample buffer */
29     size_t             size;             /* buffer size */
30     size_t             offs;             /* offset to next sample */
31     uint32_t           msec;             /* length in milliseconds */
32     int                rate;             /* sample rate */
33     int                nchannel;         /* number of channels */
34     uint32_t           nsample;          /* number of samples */
35     uint32_t           id;               /* our stream id */
36     int                event_mask;       /* mask of watched events */
37     int                fired_mask;       /* mask of delivered events */
38     pulse_stream_cb_t  cb;               /* notification callback */
39     void              *user_data;        /* callback user data */
40     mrp_list_hook_t    hook;             /* hook to list of streams */
41     mrp_refcnt_t       refcnt;           /* reference count */
42     int                stopped : 1;      /* stopped marker */
43     pa_operation      *drain;            /* draining operation */
44 } stream_t;
45
46
47 static void context_state_cb(pa_context *pc, void *user_data);
48 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
49                              uint32_t idx, void *user_data);
50 static void stream_state_cb(pa_stream *s, void *user_data);
51 static void stream_write_cb(pa_stream *s, size_t size, void *user_data);
52 static void stream_drain_cb(pa_stream *ps, int success, void *user_data);
53
54 static void stream_drain(stream_t *s);
55 static void stream_notify(stream_t *s, srs_voice_event_type_t event);
56
57
58 pulse_t *pulse_setup(pa_mainloop_api *pa, const char *name)
59 {
60     pulse_t *p;
61
62     if ((p = mrp_allocz(sizeof(*p))) == NULL)
63         return NULL;
64
65     mrp_list_init(&p->streams);
66     p->pa   = pa;
67     p->name = name ? mrp_strdup(name) : mrp_strdup("Winthorpe");
68     p->pc   = pa_context_new(p->pa, p->name);
69
70     if (p->pc == NULL) {
71         mrp_free(p);
72
73         return NULL;
74     }
75
76     p->strmid = 1;
77
78     pa_context_set_state_callback(p->pc, context_state_cb, p);
79     pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
80     pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
81
82     return p;
83 }
84
85
86 void pulse_cleanup(pulse_t *p)
87 {
88     if (p->pc != NULL) {
89         pa_context_disconnect(p->pc);
90         p->pc = NULL;
91         mrp_free(p->name);
92         p->name = NULL;
93         mrp_free(p);
94     }
95 }
96
97
98 static void stream_destroy(stream_t *s)
99 {
100     mrp_debug("destroying stream #%d", s->id);
101
102     mrp_list_delete(&s->hook);
103
104     if (s->s != NULL) {
105         pa_stream_set_state_callback(s->s, NULL, NULL);
106         pa_stream_set_write_callback(s->s, NULL, NULL);
107         pa_stream_disconnect(s->s);
108         pa_stream_unref(s->s);
109         s->s = NULL;
110         mrp_free(s->buf);
111         s->buf = NULL;
112     }
113
114     mrp_free(s);
115 }
116
117
118 static inline stream_t *stream_ref(stream_t *s)
119 {
120     if (s != NULL) {
121         mrp_ref_obj(s, refcnt);
122         mrp_debug("stream reference count increased to %d", s->refcnt);
123     }
124
125     return s;
126 }
127
128
129 static inline void stream_unref(stream_t *s)
130 {
131     if (mrp_unref_obj(s, refcnt))
132         stream_destroy(s);
133     else
134         mrp_debug("stream reference count decreased to %d", s->refcnt);
135 }
136
137
138 uint32_t pulse_play_stream(pulse_t *p, void *sample_buf, int sample_rate,
139                            int nchannel, uint32_t nsample, char **tags,
140                            int event_mask, pulse_stream_cb_t cb,
141                            void *user_data)
142 {
143     char           **t;
144     stream_t        *s;
145     pa_sample_spec   ss;
146     pa_buffer_attr   ba;
147     pa_proplist     *props;
148     size_t           pamin, pabuf;
149     int              flags;
150
151     if ((s = mrp_allocz(sizeof(*s))) == NULL)
152         return 0;
153
154     mrp_list_init(&s->hook);
155     mrp_refcnt_init(&s->refcnt);
156
157     if (tags != NULL) {
158         if ((props = pa_proplist_new()) == NULL) {
159             mrp_free(s);
160             return 0;
161         }
162
163         pa_proplist_sets(props, PA_PROP_MEDIA_ROLE, SPEECH);
164
165         for (t = tags; *t; t++)
166             pa_proplist_setp(props, *t);
167     }
168     else
169         props = NULL;
170
171     memset(&ss, 0, sizeof(ss));
172     ss.format   = PA_SAMPLE_S16LE;
173     ss.rate     = sample_rate;
174     ss.channels = nchannel;
175
176     pamin  = pa_usec_to_bytes(100 * PA_USEC_PER_MSEC, &ss);
177     pabuf  = pa_usec_to_bytes(300 * PA_USEC_PER_MSEC, &ss);
178
179     ba.maxlength = -1;
180     ba.tlength   = pabuf;
181     ba.minreq    = pamin;
182     ba.prebuf    = pabuf;
183     ba.fragsize  = -1;
184
185     s->s = pa_stream_new_with_proplist(p->pc, TTS, &ss, NULL, props);
186     if (props != NULL)
187         pa_proplist_free(props);
188
189     if (s->s == NULL) {
190         mrp_free(s);
191         return 0;
192     }
193
194     s->p          = p;
195     s->buf        = sample_buf;
196     s->offs       = 0;
197     s->rate       = sample_rate;
198     s->nchannel   = nchannel;
199     s->nsample    = nsample;
200     s->size       = 2 * nsample * nchannel;
201     s->msec       = (1.0 * nsample) / sample_rate * 1000;
202     s->cb         = cb;
203     s->user_data  = user_data;
204     s->id         = p->strmid++;
205     s->event_mask = event_mask;
206
207     pa_stream_set_state_callback(s->s, stream_state_cb, s);
208     pa_stream_set_write_callback(s->s, stream_write_cb, s);
209
210     flags = PA_STREAM_ADJUST_LATENCY;
211     pa_stream_connect_playback(s->s, NULL, &ba, flags, NULL, NULL);
212
213     mrp_list_append(&p->streams, &s->hook);
214
215     return s->id;
216 }
217
218
219 static void stream_stop(stream_t *s, int drain, int notify)
220 {
221     if (s->stopped)
222         return;
223     else
224         s->stopped = TRUE;
225
226     if (!notify)
227         s->event_mask = 0;
228
229     if (!drain) {
230         stream_notify(s, s->offs >= s->size ?
231                       PULSE_STREAM_COMPLETED : PULSE_STREAM_ABORTED);
232     }
233     else
234         stream_drain(s);
235
236     stream_unref(s);                     /* remove intial reference */
237 }
238
239
240 int pulse_stop_stream(pulse_t *p, uint32_t id, int drain, int notify)
241 {
242     mrp_list_hook_t *sp, *sn;
243     stream_t        *se, *s;
244
245     mrp_debug("stopping stream #%u", id);
246
247     s = NULL;
248     mrp_list_foreach(&p->streams, sp, sn) {
249         se = mrp_list_entry(sp, typeof(*se), hook);
250
251         if (se->id == id) {
252             s = se;
253             break;
254         }
255     }
256
257     if (s == NULL) {
258         errno = ENOENT;
259         return -1;
260     }
261
262     stream_stop(s, drain, notify);
263
264     return 0;
265 }
266
267
268 static void connect_timer_cb(pa_mainloop_api *api, pa_time_event *e,
269                              const struct timeval *tv, void *user_data)
270 {
271     pulse_t *p = (pulse_t *)user_data;
272
273     if (p->pc != NULL) {
274         pa_context_unref(p->pc);
275         p->pc = NULL;
276     }
277
278     p->pc = pa_context_new(p->pa, p->name);
279
280     pa_context_set_state_callback(p->pc, context_state_cb, p);
281     pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
282     pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
283
284     p->pa->time_free(p->reconn);
285     p->reconn = NULL;
286 }
287
288
289 static void stop_reconnect(pulse_t *p)
290 {
291     if (p->reconn != NULL) {
292         p->pa->time_free(p->reconn);
293         p->reconn = NULL;
294     }
295 }
296
297
298 static void start_reconnect(pulse_t *p)
299 {
300     struct timeval tv;
301
302     stop_reconnect(p);
303
304     pa_timeval_add(pa_gettimeofday(&tv), 5000);
305
306     p->reconn = p->pa->time_new(p->pa, &tv, connect_timer_cb, p);
307 }
308
309
310 static void context_state_cb(pa_context *pc, void *user_data)
311 {
312     pulse_t *p = (pulse_t *)user_data;
313
314     switch (pa_context_get_state(pc)) {
315     case PA_CONTEXT_CONNECTING:
316         mrp_debug("pulse: connection being established...");
317         p->connected = FALSE;
318         stop_reconnect(p);
319         break;
320
321     case PA_CONTEXT_AUTHORIZING:
322         mrp_debug("pulse: connection being authenticated...");
323         p->connected = FALSE;
324         break;
325
326     case PA_CONTEXT_SETTING_NAME:
327         mrp_debug("pulse: setting connection name...");
328         p->connected = FALSE;
329         break;
330
331     case PA_CONTEXT_READY:
332         mrp_log_info("pulse: connection up and ready");
333         p->connected = TRUE;
334         break;
335
336     case PA_CONTEXT_TERMINATED:
337         mrp_log_info("pulse: connection terminated");
338         p->connected = FALSE;
339         start_reconnect(p);
340         break;
341
342     case PA_CONTEXT_FAILED:
343         mrp_log_error("pulse: connetion failed");
344     default:
345         p->connected = FALSE;
346         start_reconnect(p);
347         break;
348     }
349 }
350
351
352 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
353                              uint32_t idx, void *user_data)
354 {
355     pulse_t *p = (pulse_t *)user_data;
356
357     MRP_UNUSED(pc);
358     MRP_UNUSED(e);
359     MRP_UNUSED(idx);
360     MRP_UNUSED(user_data);
361
362     MRP_UNUSED(p);
363
364     return;
365 }
366
367
368 static void stream_notify(stream_t *s, srs_voice_event_type_t event)
369 {
370     int                mask = (1 << event);
371     srs_voice_event_t  e;
372
373     if (s->cb == NULL || !(s->event_mask & mask))
374         return;
375
376     if ((mask & PULSE_MASK_ONESHOT) && (s->fired_mask & mask))
377         return;
378
379     e.type = event;
380     e.id   = s->id;
381
382     switch (event) {
383     case PULSE_STREAM_STARTED:
384         e.data.progress.pcnt = 0;
385         e.data.progress.msec = 0;
386         break;
387
388     case PULSE_STREAM_PROGRESS:
389         e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
390         e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
391         break;
392
393     case PULSE_STREAM_COMPLETED:
394         e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
395         e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
396         break;
397
398     case PULSE_STREAM_ABORTED:
399         e.data.progress.pcnt = 0;
400         e.data.progress.msec = 0;
401         break;
402
403     default:
404         return;
405     }
406
407     stream_ref(s);
408     s->cb(s->p, &e, s->user_data);
409     stream_unref(s);
410 }
411
412
413 static void stream_state_cb(pa_stream *ps, void *user_data)
414 {
415     stream_t           *s   = (stream_t *)user_data;
416     pulse_t            *p   = s->p;
417     pa_context_state_t  cst = pa_context_get_state(p->pc);
418     pa_stream_state_t   sst;
419
420     if (cst == PA_CONTEXT_TERMINATED || cst == PA_CONTEXT_FAILED)
421         return;
422
423     stream_ref(s);
424
425     switch ((sst = pa_stream_get_state(s->s))) {
426     case PA_STREAM_CREATING:
427         mrp_debug("pulse: stream #%u being created", s->id);
428         break;
429
430     case PA_STREAM_READY:
431         mrp_debug("pulse: stream #%u ready", s->id);
432         stream_notify(s, PULSE_STREAM_STARTED);
433         break;
434
435     case PA_STREAM_TERMINATED:
436     case PA_STREAM_FAILED:
437     default:
438         mrp_debug("pulse: stream #%u state %d", s->id, sst);
439
440         pa_stream_disconnect(s->s);
441         pa_stream_set_state_callback(s->s, NULL, NULL);
442         pa_stream_set_write_callback(s->s, NULL, NULL);
443
444         if (sst == PA_STREAM_TERMINATED)
445             stream_notify(s, PULSE_STREAM_COMPLETED);
446         else
447             stream_notify(s, PULSE_STREAM_ABORTED);
448     }
449
450     stream_unref(s);
451 }
452
453
454 static void stream_drain(stream_t *s)
455 {
456     if (s->drain == NULL) {
457         mrp_debug("pulse: stream #%u done, draining", s->id);
458         stream_ref(s);
459         s->drain = pa_stream_drain(s->s, stream_drain_cb, s);
460     }
461 }
462
463
464 static void stream_drain_cb(pa_stream *ps, int success, void *user_data)
465 {
466     stream_t *s = (stream_t *)user_data;
467
468     mrp_debug("pulse: stream #%u drained %s", s->id,
469               success ? "successfully" : "failed");
470
471     pa_operation_unref(s->drain);
472     s->drain = NULL;
473     stream_notify(s, PULSE_STREAM_COMPLETED);
474     stream_unref(s);
475 }
476
477
478 static void stream_write_cb(pa_stream *ps, size_t size, void *user_data)
479 {
480     stream_t *s = (stream_t *)user_data;
481     int       done;
482
483     stream_notify(s, PULSE_STREAM_PROGRESS);
484
485     if (s->offs == s->size) {
486         pa_stream_set_write_callback(s->s, NULL, NULL);
487         return;
488     }
489
490     stream_ref(s);
491
492     if (s->offs + size >= s->size) {
493         size = s->size - s->offs;
494         done = TRUE;
495     }
496     else
497         done = FALSE;
498
499     if (pa_stream_write(s->s, s->buf + s->offs, size, NULL, 0,
500                         PA_SEEK_RELATIVE) < 0) {
501         mrp_log_error("pulse: failed to write %zd bytes to stream #%u",
502                       size, s->id);
503         goto out;
504     }
505     else {
506         s->offs += size;
507
508         if (done)
509             stream_stop(s, TRUE, TRUE);
510     }
511
512  out:
513     stream_unref(s);
514 }