1 /* -*- c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /* ====================================================================
3 * Copyright (c) 2008 Carnegie Mellon University. All rights
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
18 * This work was supported in part by funding from the Defense Advanced
19 * Research Projects Agency and the National Science Foundation of the
20 * United States of America, and the CMU Sphinx Speech Consortium.
22 * THIS SOFTWARE IS PROVIDED BY CARNEGIE MELLON UNIVERSITY ``AS IS'' AND
23 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
24 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
25 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY
26 * NOR ITS EMPLOYEES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 * ====================================================================
40 * @brief Simple portable thread functions
41 * @author David Huggins-Daines <dhuggins@cs.cmu.edu>
46 #include "sphinxbase/sbthread.h"
47 #include "sphinxbase/ckd_alloc.h"
48 #include "sphinxbase/err.h"
51 * Platform-specific parts: threads, mutexes, and signals.
53 #if (defined(_WIN32) || defined(__CYGWIN__)) && !defined(__SYMBIAN32__)
54 #define _WIN32_WINNT 0x0400
67 /* Ringbuffer for passing messages. */
73 /* Current message is stored here. */
89 sbthread_internal_main(LPVOID arg)
91 sbthread_t *th = (sbthread_t *)arg;
99 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
103 th = ckd_calloc(1, sizeof(*th));
107 th->msgq = sbmsgq_init(256);
108 th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid);
109 if (th->th == NULL) {
117 sbthread_wait(sbthread_t *th)
121 /* It has already been joined. */
125 rv = WaitForSingleObject(th->th, INFINITE);
126 if (rv == WAIT_FAILED) {
127 E_ERROR("Failed to join thread: WAIT_FAILED\n");
130 GetExitCodeThread(th->th, &exit);
137 cond_timed_wait(HANDLE cond, int sec, int nsec)
141 rv = WaitForSingleObject(cond, INFINITE);
146 ms = sec * 1000 + nsec / (1000*1000);
147 rv = WaitForSingleObject(cond, ms);
152 /* Silvio Moioli: updated to use Unicode */
158 evt = ckd_calloc(1, sizeof(*evt));
159 evt->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
160 if (evt->evt == NULL) {
168 sbevent_free(sbevent_t *evt)
170 CloseHandle(evt->evt);
175 sbevent_signal(sbevent_t *evt)
177 return SetEvent(evt->evt) ? 0 : -1;
181 sbevent_wait(sbevent_t *evt, int sec, int nsec)
185 rv = cond_timed_wait(evt->evt, sec, nsec);
194 mtx = ckd_calloc(1, sizeof(*mtx));
195 InitializeCriticalSection(&mtx->mtx);
200 sbmtx_trylock(sbmtx_t *mtx)
202 return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1;
206 sbmtx_lock(sbmtx_t *mtx)
208 EnterCriticalSection(&mtx->mtx);
213 sbmtx_unlock(sbmtx_t *mtx)
215 LeaveCriticalSection(&mtx->mtx);
220 sbmtx_free(sbmtx_t *mtx)
222 DeleteCriticalSection(&mtx->mtx);
227 sbmsgq_init(size_t depth)
231 msgq = ckd_calloc(1, sizeof(*msgq));
233 msgq->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
234 if (msgq->evt == NULL) {
238 InitializeCriticalSection(&msgq->mtx);
239 msgq->data = ckd_calloc(depth, 1);
240 msgq->msg = ckd_calloc(depth, 1);
245 sbmsgq_free(sbmsgq_t *msgq)
247 CloseHandle(msgq->evt);
248 ckd_free(msgq->data);
254 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
256 char const *cdata = (char const *)data;
259 /* Don't allow things bigger than depth to be sent! */
260 if (len + sizeof(len) > q->depth)
263 if (q->nbytes + len + sizeof(len) > q->depth)
264 WaitForSingleObject(q->evt, INFINITE);
266 /* Lock things while we manipulate the buffer (FIXME: this
267 actually should have been atomic with the wait above ...) */
268 EnterCriticalSection(&q->mtx);
269 in = (q->out + q->nbytes) % q->depth;
270 /* First write the size of the message. */
271 if (in + sizeof(len) > q->depth) {
272 /* Handle the annoying case where the size field gets wrapped around. */
273 size_t len1 = q->depth - in;
274 memcpy(q->data + in, &len, len1);
275 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
276 q->nbytes += sizeof(len);
277 in = sizeof(len) - len1;
280 memcpy(q->data + in, &len, sizeof(len));
281 q->nbytes += sizeof(len);
285 /* Now write the message body. */
286 if (in + len > q->depth) {
287 /* Handle wraparound. */
288 size_t len1 = q->depth - in;
289 memcpy(q->data + in, cdata, len1);
295 memcpy(q->data + in, cdata, len);
298 /* Signal the condition variable. */
301 LeaveCriticalSection(&q->mtx);
307 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
312 /* Wait for data to be available. */
313 if (q->nbytes == 0) {
314 if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED)
315 /* Timed out or something... */
318 /* Lock to manipulate the queue (FIXME) */
319 EnterCriticalSection(&q->mtx);
320 /* Get the message size. */
321 if (q->out + sizeof(q->msglen) > q->depth) {
322 /* Handle annoying wraparound case. */
323 size_t len1 = q->depth - q->out;
324 memcpy(&q->msglen, q->data + q->out, len1);
325 memcpy(((char *)&q->msglen) + len1, q->data,
326 sizeof(q->msglen) - len1);
327 q->out = sizeof(q->msglen) - len1;
330 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
331 q->out += sizeof(q->msglen);
333 q->nbytes -= sizeof(q->msglen);
334 /* Get the message body. */
337 if (q->out + q->msglen > q->depth) {
338 /* Handle wraparound. */
339 size_t len1 = q->depth - q->out;
340 memcpy(outptr, q->data + q->out, len1);
346 memcpy(outptr, q->data + q->out, len);
350 /* Signal the condition variable. */
353 LeaveCriticalSection(&q->mtx);
355 *out_len = q->msglen;
361 #include <sys/time.h>
372 /* Ringbuffer for passing messages. */
378 /* Current message is stored here. */
396 sbthread_internal_main(void *arg)
398 sbthread_t *th = (sbthread_t *)arg;
401 rv = (*th->func)(th);
402 return (void *)(long)rv;
406 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
411 th = ckd_calloc(1, sizeof(*th));
415 th->msgq = sbmsgq_init(1024);
416 if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) {
417 E_ERROR("Failed to create thread: %d\n", rv);
425 sbthread_wait(sbthread_t *th)
430 /* It has already been joined. */
431 if (th->th == (pthread_t)-1)
434 rv = pthread_join(th->th, &exit);
436 E_ERROR("Failed to join thread: %d\n", rv);
439 th->th = (pthread_t)-1;
440 return (int)(long)exit;
444 sbmsgq_init(size_t depth)
448 msgq = ckd_calloc(1, sizeof(*msgq));
450 if (pthread_cond_init(&msgq->cond, NULL) != 0) {
454 if (pthread_mutex_init(&msgq->mtx, NULL) != 0) {
455 pthread_cond_destroy(&msgq->cond);
459 msgq->data = ckd_calloc(depth, 1);
460 msgq->msg = ckd_calloc(depth, 1);
465 sbmsgq_free(sbmsgq_t *msgq)
467 pthread_mutex_destroy(&msgq->mtx);
468 pthread_cond_destroy(&msgq->cond);
469 ckd_free(msgq->data);
475 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
479 /* Don't allow things bigger than depth to be sent! */
480 if (len + sizeof(len) > q->depth)
483 /* Lock the condition variable while we manipulate the buffer. */
484 pthread_mutex_lock(&q->mtx);
485 if (q->nbytes + len + sizeof(len) > q->depth) {
486 /* Unlock and wait for space to be available. */
487 if (pthread_cond_wait(&q->cond, &q->mtx) != 0) {
488 /* Timed out, don't send anything. */
489 pthread_mutex_unlock(&q->mtx);
492 /* Condition is now locked again. */
494 in = (q->out + q->nbytes) % q->depth;
496 /* First write the size of the message. */
497 if (in + sizeof(len) > q->depth) {
498 /* Handle the annoying case where the size field gets wrapped around. */
499 size_t len1 = q->depth - in;
500 memcpy(q->data + in, &len, len1);
501 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
502 q->nbytes += sizeof(len);
503 in = sizeof(len) - len1;
506 memcpy(q->data + in, &len, sizeof(len));
507 q->nbytes += sizeof(len);
511 /* Now write the message body. */
512 if (in + len > q->depth) {
513 /* Handle wraparound. */
514 size_t len1 = q->depth - in;
515 memcpy(q->data + in, data, len1);
517 data = (char const *)data + len1;
521 memcpy(q->data + in, data, len);
524 /* Signal the condition variable. */
525 pthread_cond_signal(&q->cond);
526 /* Unlock it, we have nothing else to do. */
527 pthread_mutex_unlock(&q->mtx);
532 cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec)
536 rv = pthread_cond_wait(cond, mtx);
542 gettimeofday(&now, NULL);
543 end.tv_sec = now.tv_sec + sec;
544 end.tv_nsec = now.tv_usec * 1000 + nsec;
545 if (end.tv_nsec > (1000*1000*1000)) {
546 sec += end.tv_nsec / (1000*1000*1000);
547 end.tv_nsec = end.tv_nsec % (1000*1000*1000);
549 rv = pthread_cond_timedwait(cond, mtx, &end);
555 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
560 /* Lock the condition variable while we manipulate nmsg. */
561 pthread_mutex_lock(&q->mtx);
562 if (q->nbytes == 0) {
563 /* Unlock the condition variable and wait for a signal. */
564 if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) {
565 /* Timed out or something... */
566 pthread_mutex_unlock(&q->mtx);
569 /* Condition variable is now locked again. */
571 /* Get the message size. */
572 if (q->out + sizeof(q->msglen) > q->depth) {
573 /* Handle annoying wraparound case. */
574 size_t len1 = q->depth - q->out;
575 memcpy(&q->msglen, q->data + q->out, len1);
576 memcpy(((char *)&q->msglen) + len1, q->data,
577 sizeof(q->msglen) - len1);
578 q->out = sizeof(q->msglen) - len1;
581 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
582 q->out += sizeof(q->msglen);
584 q->nbytes -= sizeof(q->msglen);
585 /* Get the message body. */
588 if (q->out + q->msglen > q->depth) {
589 /* Handle wraparound. */
590 size_t len1 = q->depth - q->out;
591 memcpy(outptr, q->data + q->out, len1);
597 memcpy(outptr, q->data + q->out, len);
601 /* Signal the condition variable. */
602 pthread_cond_signal(&q->cond);
603 /* Unlock the condition variable, we are done. */
604 pthread_mutex_unlock(&q->mtx);
606 *out_len = q->msglen;
616 evt = ckd_calloc(1, sizeof(*evt));
617 if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) {
618 E_ERROR("Failed to initialize mutex: %d\n", rv);
622 if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) {
623 E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv);
624 pthread_mutex_destroy(&evt->mtx);
632 sbevent_free(sbevent_t *evt)
634 pthread_mutex_destroy(&evt->mtx);
635 pthread_cond_destroy(&evt->cond);
640 sbevent_signal(sbevent_t *evt)
644 pthread_mutex_lock(&evt->mtx);
645 evt->signalled = TRUE;
646 rv = pthread_cond_signal(&evt->cond);
647 pthread_mutex_unlock(&evt->mtx);
652 sbevent_wait(sbevent_t *evt, int sec, int nsec)
656 /* Lock the mutex before we check its signalled state. */
657 pthread_mutex_lock(&evt->mtx);
658 /* If it's not signalled, then wait until it is. */
660 rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec);
661 /* Set its state to unsignalled if we were successful. */
663 evt->signalled = FALSE;
664 /* And unlock its mutex. */
665 pthread_mutex_unlock(&evt->mtx);
675 mtx = ckd_calloc(1, sizeof(*mtx));
676 if (pthread_mutex_init(&mtx->mtx, NULL) != 0) {
684 sbmtx_trylock(sbmtx_t *mtx)
686 return pthread_mutex_trylock(&mtx->mtx);
690 sbmtx_lock(sbmtx_t *mtx)
692 return pthread_mutex_lock(&mtx->mtx);
696 sbmtx_unlock(sbmtx_t *mtx)
698 return pthread_mutex_unlock(&mtx->mtx);
702 sbmtx_free(sbmtx_t *mtx)
704 pthread_mutex_destroy(&mtx->mtx);
707 #endif /* not WIN32 */
710 sbthread_config(sbthread_t *th)
716 sbthread_arg(sbthread_t *th)
722 sbthread_msgq(sbthread_t *th)
728 sbthread_send(sbthread_t *th, size_t len, void const *data)
730 return sbmsgq_send(th->msgq, len, data);
734 sbthread_free(sbthread_t *th)
737 sbmsgq_free(th->msgq);