packaging: bumped version, updated changelog.
[profile/ivi/speech-recognition.git] / src / daemon / pulse.c
1 #include <errno.h>
2
3 #include <murphy/common/debug.h>
4 #include <murphy/common/log.h>
5 #include <murphy/common/mm.h>
6 #include <murphy/common/list.h>
7 #include <murphy/common/refcnt.h>
8
9 #include "pulse.h"
10
11 #define SPEECH "speech"
12 #define TTS    "text-to-speech"
13
14 struct srs_pulse_s {
15     pa_mainloop_api *pa;                 /* PA mainloop API */
16     char            *name;               /* PA context name */
17     pa_context      *pc;                 /* PA context */
18     uint32_t         strmid;             /* next stream id */
19     mrp_list_hook_t  streams;            /* active streams */
20     int              connected;          /* whether connection is up */
21     pa_time_event   *reconn;             /* reconnect timer */
22 };
23
24
25 typedef struct {
26     srs_pulse_t       *p;                /* our pulse_t context */
27     pa_stream         *s;                /* associated PA stream */
28     void              *buf;              /* pre-generated sample buffer */
29     size_t             size;             /* buffer size */
30     size_t             offs;             /* offset to next sample */
31     uint32_t           msec;             /* length in milliseconds */
32     int                rate;             /* sample rate */
33     int                nchannel;         /* number of channels */
34     uint32_t           nsample;          /* number of samples */
35     uint32_t           id;               /* our stream id */
36     int                event_mask;       /* mask of watched events */
37     int                fired_mask;       /* mask of delivered events */
38     srs_stream_cb_t    cb;               /* notification callback */
39     void              *user_data;        /* callback user data */
40     mrp_list_hook_t    hook;             /* hook to list of streams */
41     mrp_refcnt_t       refcnt;           /* reference count */
42     int                stopped : 1;      /* stopped marker */
43     pa_operation      *drain;            /* draining operation */
44 } stream_t;
45
46
47 static void context_state_cb(pa_context *pc, void *user_data);
48 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
49                              uint32_t idx, void *user_data);
50 static void stream_state_cb(pa_stream *s, void *user_data);
51 static void stream_write_cb(pa_stream *s, size_t size, void *user_data);
52 static void stream_drain_cb(pa_stream *ps, int success, void *user_data);
53
54 static void stream_drain(stream_t *s);
55 static void stream_notify(stream_t *s, srs_voice_event_type_t event);
56
57
58 srs_pulse_t *srs_pulse_setup(pa_mainloop_api *pa, const char *name)
59 {
60     srs_pulse_t *p;
61
62     if ((p = mrp_allocz(sizeof(*p))) == NULL)
63         return NULL;
64
65     mrp_list_init(&p->streams);
66     p->pa   = pa;
67     p->name = name ? mrp_strdup(name) : mrp_strdup("Winthorpe");
68     p->pc   = pa_context_new(p->pa, p->name);
69
70     if (p->pc == NULL) {
71         mrp_free(p);
72
73         return NULL;
74     }
75
76     mrp_log_info("pulse: trying to connect to server...");
77
78     p->strmid = 1;
79
80     pa_context_set_state_callback(p->pc, context_state_cb, p);
81     pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
82     pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
83
84     return p;
85 }
86
87
88 void srs_pulse_cleanup(srs_pulse_t *p)
89 {
90     if (p->pc != NULL) {
91         pa_context_disconnect(p->pc);
92         p->pc = NULL;
93         mrp_free(p->name);
94         p->name = NULL;
95         mrp_free(p);
96     }
97 }
98
99
100 static void stream_destroy(stream_t *s)
101 {
102     mrp_debug("destroying stream #%d", s->id);
103
104     mrp_list_delete(&s->hook);
105
106     if (s->s != NULL) {
107         pa_stream_set_state_callback(s->s, NULL, NULL);
108         pa_stream_set_write_callback(s->s, NULL, NULL);
109         pa_stream_disconnect(s->s);
110         pa_stream_unref(s->s);
111         s->s = NULL;
112         mrp_free(s->buf);
113         s->buf = NULL;
114     }
115
116     mrp_free(s);
117 }
118
119
120 static inline stream_t *stream_ref(stream_t *s)
121 {
122     if (s != NULL) {
123         mrp_ref_obj(s, refcnt);
124         mrp_debug("stream reference count increased to %d", s->refcnt);
125     }
126
127     return s;
128 }
129
130
131 static inline void stream_unref(stream_t *s)
132 {
133     if (mrp_unref_obj(s, refcnt))
134         stream_destroy(s);
135     else
136         mrp_debug("stream reference count decreased to %d", s->refcnt);
137 }
138
139
140 uint32_t srs_play_stream(srs_pulse_t *p, void *sample_buf, int sample_rate,
141                          int nchannel, uint32_t nsample, char **tags,
142                          int event_mask, srs_stream_cb_t cb,
143                          void *user_data)
144 {
145     char           **t;
146     stream_t        *s;
147     pa_sample_spec   ss;
148     pa_buffer_attr   ba;
149     pa_proplist     *props;
150     size_t           pamin, pabuf;
151     int              flags;
152
153     if ((s = mrp_allocz(sizeof(*s))) == NULL)
154         return 0;
155
156     mrp_list_init(&s->hook);
157     mrp_refcnt_init(&s->refcnt);
158
159     if (tags != NULL) {
160         if ((props = pa_proplist_new()) == NULL) {
161             mrp_free(s);
162             return 0;
163         }
164
165         pa_proplist_sets(props, PA_PROP_MEDIA_ROLE, SPEECH);
166
167         for (t = tags; *t; t++)
168             pa_proplist_setp(props, *t);
169     }
170     else
171         props = NULL;
172
173     memset(&ss, 0, sizeof(ss));
174     ss.format   = PA_SAMPLE_S16LE;
175     ss.rate     = sample_rate;
176     ss.channels = nchannel;
177
178     pamin  = pa_usec_to_bytes(100 * PA_USEC_PER_MSEC, &ss);
179     pabuf  = pa_usec_to_bytes(300 * PA_USEC_PER_MSEC, &ss);
180
181     ba.maxlength = -1;
182     ba.tlength   = pabuf;
183     ba.minreq    = pamin;
184     ba.prebuf    = pabuf;
185     ba.fragsize  = -1;
186
187     s->s = pa_stream_new_with_proplist(p->pc, TTS, &ss, NULL, props);
188     if (props != NULL)
189         pa_proplist_free(props);
190
191     if (s->s == NULL) {
192         mrp_free(s);
193         return 0;
194     }
195
196     s->p          = p;
197     s->buf        = sample_buf;
198     s->offs       = 0;
199     s->rate       = sample_rate;
200     s->nchannel   = nchannel;
201     s->nsample    = nsample;
202     s->size       = 2 * nsample * nchannel;
203     s->msec       = (1.0 * nsample) / sample_rate * 1000;
204     s->cb         = cb;
205     s->user_data  = user_data;
206     s->id         = p->strmid++;
207     s->event_mask = event_mask;
208
209     pa_stream_set_state_callback(s->s, stream_state_cb, s);
210     pa_stream_set_write_callback(s->s, stream_write_cb, s);
211
212     flags = PA_STREAM_ADJUST_LATENCY;
213     pa_stream_connect_playback(s->s, NULL, &ba, flags, NULL, NULL);
214
215     mrp_list_append(&p->streams, &s->hook);
216
217     return s->id;
218 }
219
220
221 static void stream_stop(stream_t *s, int drain, int notify)
222 {
223     if (s->stopped)
224         return;
225     else
226         s->stopped = TRUE;
227
228     if (!notify)
229         s->event_mask = 0;
230
231     if (!drain) {
232         stream_notify(s, s->offs >= s->size ?
233                       SRS_STREAM_EVENT_COMPLETED : SRS_STREAM_EVENT_ABORTED);
234     }
235     else
236         stream_drain(s);
237
238     stream_unref(s);                     /* remove intial reference */
239 }
240
241
242 int srs_stop_stream(srs_pulse_t *p, uint32_t id, int drain, int notify)
243 {
244     mrp_list_hook_t *sp, *sn;
245     stream_t        *se, *s;
246
247     mrp_debug("stopping stream #%u", id);
248
249     s = NULL;
250     mrp_list_foreach(&p->streams, sp, sn) {
251         se = mrp_list_entry(sp, typeof(*se), hook);
252
253         if (se->id == id) {
254             s = se;
255             break;
256         }
257     }
258
259     if (s == NULL) {
260         errno = ENOENT;
261         return -1;
262     }
263
264     stream_stop(s, drain, notify);
265
266     return 0;
267 }
268
269
270 static void connect_timer_cb(pa_mainloop_api *api, pa_time_event *e,
271                              const struct timeval *tv, void *user_data)
272 {
273     srs_pulse_t *p = (srs_pulse_t *)user_data;
274
275     MRP_UNUSED(api);
276     MRP_UNUSED(e);
277     MRP_UNUSED(tv);
278
279     if (p->pc != NULL) {
280         pa_context_unref(p->pc);
281         p->pc = NULL;
282     }
283
284     p->pc = pa_context_new(p->pa, p->name);
285
286     pa_context_set_state_callback(p->pc, context_state_cb, p);
287     pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
288     pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
289
290     p->pa->time_free(p->reconn);
291     p->reconn = NULL;
292 }
293
294
295 static void stop_reconnect(srs_pulse_t *p)
296 {
297     if (p->reconn != NULL) {
298         p->pa->time_free(p->reconn);
299         p->reconn = NULL;
300     }
301 }
302
303
304 static void start_reconnect(srs_pulse_t *p)
305 {
306     struct timeval tv;
307
308     stop_reconnect(p);
309
310     pa_timeval_add(pa_gettimeofday(&tv), 5000);
311
312     p->reconn = p->pa->time_new(p->pa, &tv, connect_timer_cb, p);
313 }
314
315
316 static void context_state_cb(pa_context *pc, void *user_data)
317 {
318     srs_pulse_t *p = (srs_pulse_t *)user_data;
319
320     switch (pa_context_get_state(pc)) {
321     case PA_CONTEXT_CONNECTING:
322         mrp_debug("pulse: connection being established...");
323         p->connected = FALSE;
324         stop_reconnect(p);
325         break;
326
327     case PA_CONTEXT_AUTHORIZING:
328         mrp_debug("pulse: connection being authenticated...");
329         p->connected = FALSE;
330         break;
331
332     case PA_CONTEXT_SETTING_NAME:
333         mrp_debug("pulse: setting connection name...");
334         p->connected = FALSE;
335         break;
336
337     case PA_CONTEXT_READY:
338         mrp_log_info("pulse: connection up and ready");
339         p->connected = TRUE;
340         break;
341
342     case PA_CONTEXT_TERMINATED:
343         mrp_log_info("pulse: connection terminated");
344         p->connected = FALSE;
345         start_reconnect(p);
346         break;
347
348     case PA_CONTEXT_FAILED:
349         mrp_log_error("pulse: connetion failed");
350     default:
351         p->connected = FALSE;
352         start_reconnect(p);
353         break;
354     }
355 }
356
357
358 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
359                              uint32_t idx, void *user_data)
360 {
361     srs_pulse_t *p = (srs_pulse_t *)user_data;
362
363     MRP_UNUSED(pc);
364     MRP_UNUSED(e);
365     MRP_UNUSED(idx);
366     MRP_UNUSED(user_data);
367
368     MRP_UNUSED(p);
369
370     return;
371 }
372
373
374 static void stream_notify(stream_t *s, srs_voice_event_type_t event)
375 {
376     int                mask = (1 << event);
377     srs_voice_event_t  e;
378
379     if (s->cb == NULL || !(s->event_mask & mask))
380         return;
381
382     if ((mask & SRS_STREAM_MASK_ONESHOT) && (s->fired_mask & mask))
383         return;
384
385     e.type = event;
386     e.id   = s->id;
387
388     switch (event) {
389     case SRS_STREAM_EVENT_STARTED:
390         e.data.progress.pcnt = 0;
391         e.data.progress.msec = 0;
392         break;
393
394     case SRS_STREAM_EVENT_PROGRESS:
395         e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
396         e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
397         break;
398
399     case SRS_STREAM_EVENT_COMPLETED:
400         e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
401         e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
402         break;
403
404     case SRS_STREAM_EVENT_ABORTED:
405         e.data.progress.pcnt = 0;
406         e.data.progress.msec = 0;
407         break;
408
409     default:
410         return;
411     }
412
413     stream_ref(s);
414     s->cb(s->p, &e, s->user_data);
415     s->fired_mask |= mask;
416     stream_unref(s);
417 }
418
419
420 static void stream_state_cb(pa_stream *ps, void *user_data)
421 {
422     stream_t           *s   = (stream_t *)user_data;
423     srs_pulse_t        *p   = s->p;
424     pa_context_state_t  cst = pa_context_get_state(p->pc);
425     pa_stream_state_t   sst;
426
427     MRP_UNUSED(ps);
428
429     if (cst == PA_CONTEXT_TERMINATED || cst == PA_CONTEXT_FAILED)
430         return;
431
432     stream_ref(s);
433
434     switch ((sst = pa_stream_get_state(s->s))) {
435     case PA_STREAM_CREATING:
436         mrp_debug("pulse: stream #%u being created", s->id);
437         break;
438
439     case PA_STREAM_READY:
440         mrp_debug("pulse: stream #%u ready", s->id);
441         stream_notify(s, SRS_STREAM_EVENT_STARTED);
442         break;
443
444     case PA_STREAM_TERMINATED:
445     case PA_STREAM_FAILED:
446     default:
447         mrp_debug("pulse: stream #%u state %d", s->id, sst);
448
449         pa_stream_disconnect(s->s);
450         pa_stream_set_state_callback(s->s, NULL, NULL);
451         pa_stream_set_write_callback(s->s, NULL, NULL);
452
453         if (sst == PA_STREAM_TERMINATED)
454             stream_notify(s, SRS_STREAM_EVENT_COMPLETED);
455         else
456             stream_notify(s, SRS_STREAM_EVENT_ABORTED);
457     }
458
459     stream_unref(s);
460 }
461
462
463 static void stream_drain(stream_t *s)
464 {
465     if (s->drain == NULL) {
466         mrp_debug("pulse: stream #%u done, draining", s->id);
467         stream_ref(s);
468         s->drain = pa_stream_drain(s->s, stream_drain_cb, s);
469     }
470 }
471
472
473 static void stream_drain_cb(pa_stream *ps, int success, void *user_data)
474 {
475     stream_t *s = (stream_t *)user_data;
476
477     MRP_UNUSED(ps);
478
479     mrp_debug("pulse: stream #%u drained %s", s->id,
480               success ? "successfully" : "failed");
481
482     pa_operation_unref(s->drain);
483     s->drain = NULL;
484     stream_notify(s, SRS_STREAM_EVENT_COMPLETED);
485     stream_unref(s);
486 }
487
488
489 static void stream_write_cb(pa_stream *ps, size_t size, void *user_data)
490 {
491     stream_t *s = (stream_t *)user_data;
492     int       done;
493
494     MRP_UNUSED(ps);
495
496     stream_notify(s, SRS_STREAM_EVENT_PROGRESS);
497
498     if (s->offs == s->size) {
499         pa_stream_set_write_callback(s->s, NULL, NULL);
500         return;
501     }
502
503     stream_ref(s);
504
505     if (s->offs + size >= s->size) {
506         size = s->size - s->offs;
507         done = TRUE;
508     }
509     else
510         done = FALSE;
511
512     if (pa_stream_write(s->s, s->buf + s->offs, size, NULL, 0,
513                         PA_SEEK_RELATIVE) < 0) {
514         mrp_log_error("pulse: failed to write %zd bytes to stream #%u",
515                       size, s->id);
516         goto out;
517     }
518     else {
519         s->offs += size;
520
521         if (done)
522             stream_stop(s, TRUE, TRUE);
523     }
524
525  out:
526     stream_unref(s);
527 }