3 #include <murphy/common/debug.h>
4 #include <murphy/common/log.h>
5 #include <murphy/common/mm.h>
6 #include <murphy/common/list.h>
7 #include <murphy/common/refcnt.h>
11 #define SPEECH "speech"
12 #define TTS "text-to-speech"
15 pa_mainloop_api *pa; /* PA mainloop API */
16 char *name; /* PA context name */
17 pa_context *pc; /* PA context */
18 uint32_t strmid; /* next stream id */
19 mrp_list_hook_t streams; /* active streams */
20 int connected; /* whether connection is up */
21 pa_time_event *reconn; /* reconnect timer */
26 srs_pulse_t *p; /* our pulse_t context */
27 pa_stream *s; /* associated PA stream */
28 void *buf; /* pre-generated sample buffer */
29 size_t size; /* buffer size */
30 size_t offs; /* offset to next sample */
31 uint32_t msec; /* length in milliseconds */
32 int rate; /* sample rate */
33 int nchannel; /* number of channels */
34 uint32_t nsample; /* number of samples */
35 uint32_t id; /* our stream id */
36 int event_mask; /* mask of watched events */
37 int fired_mask; /* mask of delivered events */
38 srs_stream_cb_t cb; /* notification callback */
39 void *user_data; /* callback user data */
40 mrp_list_hook_t hook; /* hook to list of streams */
41 mrp_refcnt_t refcnt; /* reference count */
42 int stopped : 1; /* stopped marker */
43 pa_operation *drain; /* draining operation */
47 static void context_state_cb(pa_context *pc, void *user_data);
48 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
49 uint32_t idx, void *user_data);
50 static void stream_state_cb(pa_stream *s, void *user_data);
51 static void stream_write_cb(pa_stream *s, size_t size, void *user_data);
52 static void stream_drain_cb(pa_stream *ps, int success, void *user_data);
54 static void stream_drain(stream_t *s);
55 static void stream_notify(stream_t *s, srs_voice_event_type_t event);
58 srs_pulse_t *srs_pulse_setup(pa_mainloop_api *pa, const char *name)
62 if ((p = mrp_allocz(sizeof(*p))) == NULL)
65 mrp_list_init(&p->streams);
67 p->name = name ? mrp_strdup(name) : mrp_strdup("Winthorpe");
68 p->pc = pa_context_new(p->pa, p->name);
76 mrp_log_info("pulse: trying to connect to server...");
80 pa_context_set_state_callback(p->pc, context_state_cb, p);
81 pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
82 pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
88 void srs_pulse_cleanup(srs_pulse_t *p)
91 pa_context_disconnect(p->pc);
100 static void stream_destroy(stream_t *s)
102 mrp_debug("destroying stream #%d", s->id);
104 mrp_list_delete(&s->hook);
107 pa_stream_set_state_callback(s->s, NULL, NULL);
108 pa_stream_set_write_callback(s->s, NULL, NULL);
109 pa_stream_disconnect(s->s);
110 pa_stream_unref(s->s);
120 static inline stream_t *stream_ref(stream_t *s)
123 mrp_ref_obj(s, refcnt);
124 mrp_debug("stream reference count increased to %d", s->refcnt);
131 static inline void stream_unref(stream_t *s)
133 if (mrp_unref_obj(s, refcnt))
136 mrp_debug("stream reference count decreased to %d", s->refcnt);
140 uint32_t srs_play_stream(srs_pulse_t *p, void *sample_buf, int sample_rate,
141 int nchannel, uint32_t nsample, char **tags,
142 int event_mask, srs_stream_cb_t cb,
153 if ((s = mrp_allocz(sizeof(*s))) == NULL)
156 mrp_list_init(&s->hook);
157 mrp_refcnt_init(&s->refcnt);
160 if ((props = pa_proplist_new()) == NULL) {
165 pa_proplist_sets(props, PA_PROP_MEDIA_ROLE, SPEECH);
167 for (t = tags; *t; t++)
168 pa_proplist_setp(props, *t);
173 memset(&ss, 0, sizeof(ss));
174 ss.format = PA_SAMPLE_S16LE;
175 ss.rate = sample_rate;
176 ss.channels = nchannel;
178 pamin = pa_usec_to_bytes(100 * PA_USEC_PER_MSEC, &ss);
179 pabuf = pa_usec_to_bytes(300 * PA_USEC_PER_MSEC, &ss);
187 s->s = pa_stream_new_with_proplist(p->pc, TTS, &ss, NULL, props);
189 pa_proplist_free(props);
199 s->rate = sample_rate;
200 s->nchannel = nchannel;
201 s->nsample = nsample;
202 s->size = 2 * nsample * nchannel;
203 s->msec = (1.0 * nsample) / sample_rate * 1000;
205 s->user_data = user_data;
207 s->event_mask = event_mask;
209 pa_stream_set_state_callback(s->s, stream_state_cb, s);
210 pa_stream_set_write_callback(s->s, stream_write_cb, s);
212 flags = PA_STREAM_ADJUST_LATENCY;
213 pa_stream_connect_playback(s->s, NULL, &ba, flags, NULL, NULL);
215 mrp_list_append(&p->streams, &s->hook);
221 static void stream_stop(stream_t *s, int drain, int notify)
232 stream_notify(s, s->offs >= s->size ?
233 SRS_STREAM_EVENT_COMPLETED : SRS_STREAM_EVENT_ABORTED);
238 stream_unref(s); /* remove intial reference */
242 int srs_stop_stream(srs_pulse_t *p, uint32_t id, int drain, int notify)
244 mrp_list_hook_t *sp, *sn;
247 mrp_debug("stopping stream #%u", id);
250 mrp_list_foreach(&p->streams, sp, sn) {
251 se = mrp_list_entry(sp, typeof(*se), hook);
264 stream_stop(s, drain, notify);
270 static void connect_timer_cb(pa_mainloop_api *api, pa_time_event *e,
271 const struct timeval *tv, void *user_data)
273 srs_pulse_t *p = (srs_pulse_t *)user_data;
280 pa_context_unref(p->pc);
284 p->pc = pa_context_new(p->pa, p->name);
286 pa_context_set_state_callback(p->pc, context_state_cb, p);
287 pa_context_set_subscribe_callback(p->pc, context_event_cb, p);
288 pa_context_connect(p->pc, NULL, PA_CONTEXT_NOFAIL, NULL);
290 p->pa->time_free(p->reconn);
295 static void stop_reconnect(srs_pulse_t *p)
297 if (p->reconn != NULL) {
298 p->pa->time_free(p->reconn);
304 static void start_reconnect(srs_pulse_t *p)
310 pa_timeval_add(pa_gettimeofday(&tv), 5000);
312 p->reconn = p->pa->time_new(p->pa, &tv, connect_timer_cb, p);
316 static void context_state_cb(pa_context *pc, void *user_data)
318 srs_pulse_t *p = (srs_pulse_t *)user_data;
320 switch (pa_context_get_state(pc)) {
321 case PA_CONTEXT_CONNECTING:
322 mrp_debug("pulse: connection being established...");
323 p->connected = FALSE;
327 case PA_CONTEXT_AUTHORIZING:
328 mrp_debug("pulse: connection being authenticated...");
329 p->connected = FALSE;
332 case PA_CONTEXT_SETTING_NAME:
333 mrp_debug("pulse: setting connection name...");
334 p->connected = FALSE;
337 case PA_CONTEXT_READY:
338 mrp_log_info("pulse: connection up and ready");
342 case PA_CONTEXT_TERMINATED:
343 mrp_log_info("pulse: connection terminated");
344 p->connected = FALSE;
348 case PA_CONTEXT_FAILED:
349 mrp_log_error("pulse: connetion failed");
351 p->connected = FALSE;
358 static void context_event_cb(pa_context *pc, pa_subscription_event_type_t e,
359 uint32_t idx, void *user_data)
361 srs_pulse_t *p = (srs_pulse_t *)user_data;
366 MRP_UNUSED(user_data);
374 static void stream_notify(stream_t *s, srs_voice_event_type_t event)
376 int mask = (1 << event);
379 if (s->cb == NULL || !(s->event_mask & mask))
382 if ((mask & SRS_STREAM_MASK_ONESHOT) && (s->fired_mask & mask))
389 case SRS_STREAM_EVENT_STARTED:
390 e.data.progress.pcnt = 0;
391 e.data.progress.msec = 0;
394 case SRS_STREAM_EVENT_PROGRESS:
395 e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
396 e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
399 case SRS_STREAM_EVENT_COMPLETED:
400 e.data.progress.pcnt = ((1.0 * s->offs) / s->size) * 100;
401 e.data.progress.msec = ((1.0 * s->offs) / s->size) * s->msec;
404 case SRS_STREAM_EVENT_ABORTED:
405 e.data.progress.pcnt = 0;
406 e.data.progress.msec = 0;
414 s->cb(s->p, &e, s->user_data);
415 s->fired_mask |= mask;
420 static void stream_state_cb(pa_stream *ps, void *user_data)
422 stream_t *s = (stream_t *)user_data;
423 srs_pulse_t *p = s->p;
424 pa_context_state_t cst = pa_context_get_state(p->pc);
425 pa_stream_state_t sst;
429 if (cst == PA_CONTEXT_TERMINATED || cst == PA_CONTEXT_FAILED)
434 switch ((sst = pa_stream_get_state(s->s))) {
435 case PA_STREAM_CREATING:
436 mrp_debug("pulse: stream #%u being created", s->id);
439 case PA_STREAM_READY:
440 mrp_debug("pulse: stream #%u ready", s->id);
441 stream_notify(s, SRS_STREAM_EVENT_STARTED);
444 case PA_STREAM_TERMINATED:
445 case PA_STREAM_FAILED:
447 mrp_debug("pulse: stream #%u state %d", s->id, sst);
449 pa_stream_disconnect(s->s);
450 pa_stream_set_state_callback(s->s, NULL, NULL);
451 pa_stream_set_write_callback(s->s, NULL, NULL);
453 if (sst == PA_STREAM_TERMINATED)
454 stream_notify(s, SRS_STREAM_EVENT_COMPLETED);
456 stream_notify(s, SRS_STREAM_EVENT_ABORTED);
463 static void stream_drain(stream_t *s)
465 if (s->drain == NULL) {
466 mrp_debug("pulse: stream #%u done, draining", s->id);
468 s->drain = pa_stream_drain(s->s, stream_drain_cb, s);
473 static void stream_drain_cb(pa_stream *ps, int success, void *user_data)
475 stream_t *s = (stream_t *)user_data;
479 mrp_debug("pulse: stream #%u drained %s", s->id,
480 success ? "successfully" : "failed");
482 pa_operation_unref(s->drain);
484 stream_notify(s, SRS_STREAM_EVENT_COMPLETED);
489 static void stream_write_cb(pa_stream *ps, size_t size, void *user_data)
491 stream_t *s = (stream_t *)user_data;
496 stream_notify(s, SRS_STREAM_EVENT_PROGRESS);
498 if (s->offs == s->size) {
499 pa_stream_set_write_callback(s->s, NULL, NULL);
505 if (s->offs + size >= s->size) {
506 size = s->size - s->offs;
512 if (pa_stream_write(s->s, s->buf + s->offs, size, NULL, 0,
513 PA_SEEK_RELATIVE) < 0) {
514 mrp_log_error("pulse: failed to write %zd bytes to stream #%u",
522 stream_stop(s, TRUE, TRUE);