Initial import to Tizen
[profile/ivi/sphinxbase.git] / src / libsphinxbase / util / sbthread.c
1 /* -*- c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /* ====================================================================
3  * Copyright (c) 2008 Carnegie Mellon University.  All rights 
4  * reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer. 
12  *
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  *
18  * This work was supported in part by funding from the Defense Advanced 
19  * Research Projects Agency and the National Science Foundation of the 
20  * United States of America, and the CMU Sphinx Speech Consortium.
21  *
22  * THIS SOFTWARE IS PROVIDED BY CARNEGIE MELLON UNIVERSITY ``AS IS'' AND 
23  * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 
24  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
25  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY
26  * NOR ITS EMPLOYEES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
28  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 
29  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 
30  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
31  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
32  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  *
34  * ====================================================================
35  *
36  */
37
38 /**
39  * @file sbthread.c
40  * @brief Simple portable thread functions
41  * @author David Huggins-Daines <dhuggins@cs.cmu.edu>
42  */
43
44 #include <string.h>
45
46 #include "sphinxbase/sbthread.h"
47 #include "sphinxbase/ckd_alloc.h"
48 #include "sphinxbase/err.h"
49
50 /*
51  * Platform-specific parts: threads, mutexes, and signals.
52  */
53 #if (defined(_WIN32) || defined(__CYGWIN__)) && !defined(__SYMBIAN32__)
54 #define _WIN32_WINNT 0x0400
55 #include <windows.h>
56
57 struct sbthread_s {
58     cmd_ln_t *config;
59     sbmsgq_t *msgq;
60     sbthread_main func;
61     void *arg;
62     HANDLE th;
63     DWORD tid;
64 };
65
66 struct sbmsgq_s {
67     /* Ringbuffer for passing messages. */
68     char *data;
69     size_t depth;
70     size_t out;
71     size_t nbytes;
72
73     /* Current message is stored here. */
74     char *msg;
75     size_t msglen;
76     CRITICAL_SECTION mtx;
77     HANDLE evt;
78 };
79
80 struct sbevent_s {
81     HANDLE evt;
82 };
83
84 struct sbmtx_s {
85     CRITICAL_SECTION mtx;
86 };
87
88 DWORD WINAPI
89 sbthread_internal_main(LPVOID arg)
90 {
91     sbthread_t *th = (sbthread_t *)arg;
92     int rv;
93
94     rv = (*th->func)(th);
95     return (DWORD)rv;
96 }
97
98 sbthread_t *
99 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
100 {
101     sbthread_t *th;
102
103     th = ckd_calloc(1, sizeof(*th));
104     th->config = config;
105     th->func = func;
106     th->arg = arg;
107     th->msgq = sbmsgq_init(256);
108     th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid);
109     if (th->th == NULL) {
110         sbthread_free(th);
111         return NULL;
112     }
113     return th;
114 }
115
116 int
117 sbthread_wait(sbthread_t *th)
118 {
119     DWORD rv, exit;
120
121     /* It has already been joined. */
122     if (th->th == NULL)
123         return -1;
124
125     rv = WaitForSingleObject(th->th, INFINITE);
126     if (rv == WAIT_FAILED) {
127         E_ERROR("Failed to join thread: WAIT_FAILED\n");
128         return -1;
129     }
130     GetExitCodeThread(th->th, &exit);
131     CloseHandle(th->th);
132     th->th = NULL;
133     return (int)exit;
134 }
135
136 static DWORD
137 cond_timed_wait(HANDLE cond, int sec, int nsec)
138 {
139     DWORD rv;
140     if (sec == -1) {
141         rv = WaitForSingleObject(cond, INFINITE);
142     }
143     else {
144         DWORD ms;
145
146         ms = sec * 1000 + nsec / (1000*1000);
147         rv = WaitForSingleObject(cond, ms);
148     }
149     return rv;
150 }
151
152 /* Silvio Moioli: updated to use Unicode */
153 sbevent_t *
154 sbevent_init(void)
155 {
156     sbevent_t *evt;
157
158     evt = ckd_calloc(1, sizeof(*evt));
159     evt->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
160     if (evt->evt == NULL) {
161         ckd_free(evt);
162         return NULL;
163     }
164     return evt;
165 }
166
167 void
168 sbevent_free(sbevent_t *evt)
169 {
170     CloseHandle(evt->evt);
171     ckd_free(evt);
172 }
173
174 int
175 sbevent_signal(sbevent_t *evt)
176 {
177     return SetEvent(evt->evt) ? 0 : -1;
178 }
179
180 int
181 sbevent_wait(sbevent_t *evt, int sec, int nsec)
182 {
183     DWORD rv;
184
185     rv = cond_timed_wait(evt->evt, sec, nsec);
186     return rv;
187 }
188
189 sbmtx_t *
190 sbmtx_init(void)
191 {
192     sbmtx_t *mtx;
193
194     mtx = ckd_calloc(1, sizeof(*mtx));
195     InitializeCriticalSection(&mtx->mtx);
196     return mtx;
197 }
198
199 int
200 sbmtx_trylock(sbmtx_t *mtx)
201 {
202     return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1;
203 }
204
205 int
206 sbmtx_lock(sbmtx_t *mtx)
207 {
208     EnterCriticalSection(&mtx->mtx);
209     return 0;
210 }
211
212 int
213 sbmtx_unlock(sbmtx_t *mtx)
214 {
215     LeaveCriticalSection(&mtx->mtx);
216     return 0;
217 }
218
219 void
220 sbmtx_free(sbmtx_t *mtx)
221 {
222     DeleteCriticalSection(&mtx->mtx);
223     ckd_free(mtx);
224 }
225
226 sbmsgq_t *
227 sbmsgq_init(size_t depth)
228 {
229     sbmsgq_t *msgq;
230
231     msgq = ckd_calloc(1, sizeof(*msgq));
232     msgq->depth = depth;
233     msgq->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
234     if (msgq->evt == NULL) {
235         ckd_free(msgq);
236         return NULL;
237     }
238     InitializeCriticalSection(&msgq->mtx);
239     msgq->data = ckd_calloc(depth, 1);
240     msgq->msg = ckd_calloc(depth, 1);
241     return msgq;
242 }
243
244 void
245 sbmsgq_free(sbmsgq_t *msgq)
246 {
247     CloseHandle(msgq->evt);
248     ckd_free(msgq->data);
249     ckd_free(msgq->msg);
250     ckd_free(msgq);
251 }
252
253 int
254 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
255 {
256     char const *cdata = (char const *)data;
257     size_t in;
258
259     /* Don't allow things bigger than depth to be sent! */
260     if (len + sizeof(len) > q->depth)
261         return -1;
262
263     if (q->nbytes + len + sizeof(len) > q->depth)
264         WaitForSingleObject(q->evt, INFINITE);
265
266     /* Lock things while we manipulate the buffer (FIXME: this
267        actually should have been atomic with the wait above ...) */
268     EnterCriticalSection(&q->mtx);
269     in = (q->out + q->nbytes) % q->depth;
270     /* First write the size of the message. */
271     if (in + sizeof(len) > q->depth) {
272         /* Handle the annoying case where the size field gets wrapped around. */
273         size_t len1 = q->depth - in;
274         memcpy(q->data + in, &len, len1);
275         memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
276         q->nbytes += sizeof(len);
277         in = sizeof(len) - len1;
278     }
279     else {
280         memcpy(q->data + in, &len, sizeof(len));
281         q->nbytes += sizeof(len);
282         in += sizeof(len);
283     }
284
285     /* Now write the message body. */
286     if (in + len > q->depth) {
287         /* Handle wraparound. */
288         size_t len1 = q->depth - in;
289         memcpy(q->data + in, cdata, len1);
290         q->nbytes += len1;
291         cdata += len1;
292         len -= len1;
293         in = 0;
294     }
295     memcpy(q->data + in, cdata, len);
296     q->nbytes += len;
297
298     /* Signal the condition variable. */
299     SetEvent(q->evt);
300     /* Unlock. */
301     LeaveCriticalSection(&q->mtx);
302
303     return 0;
304 }
305
306 void *
307 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
308 {
309     char *outptr;
310     size_t len;
311
312     /* Wait for data to be available. */
313     if (q->nbytes == 0) {
314         if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED)
315             /* Timed out or something... */
316             return NULL;
317     }
318     /* Lock to manipulate the queue (FIXME) */
319     EnterCriticalSection(&q->mtx);
320     /* Get the message size. */
321     if (q->out + sizeof(q->msglen) > q->depth) {
322         /* Handle annoying wraparound case. */
323         size_t len1 = q->depth - q->out;
324         memcpy(&q->msglen, q->data + q->out, len1);
325         memcpy(((char *)&q->msglen) + len1, q->data,
326                sizeof(q->msglen) - len1);
327         q->out = sizeof(q->msglen) - len1;
328     }
329     else {
330         memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
331         q->out += sizeof(q->msglen);
332     }
333     q->nbytes -= sizeof(q->msglen);
334     /* Get the message body. */
335     outptr = q->msg;
336     len = q->msglen;
337     if (q->out + q->msglen > q->depth) {
338         /* Handle wraparound. */
339         size_t len1 = q->depth - q->out;
340         memcpy(outptr, q->data + q->out, len1);
341         outptr += len1;
342         len -= len1;
343         q->nbytes -= len1;
344         q->out = 0;
345     }
346     memcpy(outptr, q->data + q->out, len);
347     q->nbytes -= len;
348     q->out += len;
349
350     /* Signal the condition variable. */
351     SetEvent(q->evt);
352     /* Unlock. */
353     LeaveCriticalSection(&q->mtx);
354     if (out_len)
355         *out_len = q->msglen;
356     return q->msg;
357 }
358
359 #else /* POSIX */
360 #include <pthread.h>
361 #include <sys/time.h>
362
363 struct sbthread_s {
364     cmd_ln_t *config;
365     sbmsgq_t *msgq;
366     sbthread_main func;
367     void *arg;
368     pthread_t th;
369 };
370
371 struct sbmsgq_s {
372     /* Ringbuffer for passing messages. */
373     char *data;
374     size_t depth;
375     size_t out;
376     size_t nbytes;
377
378     /* Current message is stored here. */
379     char *msg;
380     size_t msglen;
381     pthread_mutex_t mtx;
382     pthread_cond_t cond;
383 };
384
385 struct sbevent_s {
386     pthread_mutex_t mtx;
387     pthread_cond_t cond;
388     int signalled;
389 };
390
391 struct sbmtx_s {
392     pthread_mutex_t mtx;
393 };
394
395 static void *
396 sbthread_internal_main(void *arg)
397 {
398     sbthread_t *th = (sbthread_t *)arg;
399     int rv;
400
401     rv = (*th->func)(th);
402     return (void *)(long)rv;
403 }
404
405 sbthread_t *
406 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
407 {
408     sbthread_t *th;
409     int rv;
410
411     th = ckd_calloc(1, sizeof(*th));
412     th->config = config;
413     th->func = func;
414     th->arg = arg;
415     th->msgq = sbmsgq_init(1024);
416     if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) {
417         E_ERROR("Failed to create thread: %d\n", rv);
418         sbthread_free(th);
419         return NULL;
420     }
421     return th;
422 }
423
424 int
425 sbthread_wait(sbthread_t *th)
426 {
427     void *exit;
428     int rv;
429
430     /* It has already been joined. */
431     if (th->th == (pthread_t)-1)
432         return -1;
433
434     rv = pthread_join(th->th, &exit);
435     if (rv != 0) {
436         E_ERROR("Failed to join thread: %d\n", rv);
437         return -1;
438     }
439     th->th = (pthread_t)-1;
440     return (int)(long)exit;
441 }
442
443 sbmsgq_t *
444 sbmsgq_init(size_t depth)
445 {
446     sbmsgq_t *msgq;
447
448     msgq = ckd_calloc(1, sizeof(*msgq));
449     msgq->depth = depth;
450     if (pthread_cond_init(&msgq->cond, NULL) != 0) {
451         ckd_free(msgq);
452         return NULL;
453     }
454     if (pthread_mutex_init(&msgq->mtx, NULL) != 0) {
455         pthread_cond_destroy(&msgq->cond);
456         ckd_free(msgq);
457         return NULL;
458     }
459     msgq->data = ckd_calloc(depth, 1);
460     msgq->msg = ckd_calloc(depth, 1);
461     return msgq;
462 }
463
464 void
465 sbmsgq_free(sbmsgq_t *msgq)
466 {
467     pthread_mutex_destroy(&msgq->mtx);
468     pthread_cond_destroy(&msgq->cond);
469     ckd_free(msgq->data);
470     ckd_free(msgq->msg);
471     ckd_free(msgq);
472 }
473
474 int
475 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
476 {
477     size_t in;
478
479     /* Don't allow things bigger than depth to be sent! */
480     if (len + sizeof(len) > q->depth)
481         return -1;
482
483     /* Lock the condition variable while we manipulate the buffer. */
484     pthread_mutex_lock(&q->mtx);
485     if (q->nbytes + len + sizeof(len) > q->depth) {
486         /* Unlock and wait for space to be available. */
487         if (pthread_cond_wait(&q->cond, &q->mtx) != 0) {
488             /* Timed out, don't send anything. */
489             pthread_mutex_unlock(&q->mtx);
490             return -1;
491         }
492         /* Condition is now locked again. */
493     }
494     in = (q->out + q->nbytes) % q->depth;
495
496     /* First write the size of the message. */
497     if (in + sizeof(len) > q->depth) {
498         /* Handle the annoying case where the size field gets wrapped around. */
499         size_t len1 = q->depth - in;
500         memcpy(q->data + in, &len, len1);
501         memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
502         q->nbytes += sizeof(len);
503         in = sizeof(len) - len1;
504     }
505     else {
506         memcpy(q->data + in, &len, sizeof(len));
507         q->nbytes += sizeof(len);
508         in += sizeof(len);
509     }
510
511     /* Now write the message body. */
512     if (in + len > q->depth) {
513         /* Handle wraparound. */
514         size_t len1 = q->depth - in;
515         memcpy(q->data + in, data, len1);
516         q->nbytes += len1;
517         data = (char const *)data + len1;
518         len -= len1;
519         in = 0;
520     }
521     memcpy(q->data + in, data, len);
522     q->nbytes += len;
523
524     /* Signal the condition variable. */
525     pthread_cond_signal(&q->cond);
526     /* Unlock it, we have nothing else to do. */
527     pthread_mutex_unlock(&q->mtx);
528     return 0;
529 }
530
531 static int
532 cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec)
533 {
534     int rv;
535     if (sec == -1) {
536         rv = pthread_cond_wait(cond, mtx);
537     }
538     else {
539         struct timeval now;
540         struct timespec end;
541
542         gettimeofday(&now, NULL);
543         end.tv_sec = now.tv_sec + sec;
544         end.tv_nsec = now.tv_usec * 1000 + nsec;
545         if (end.tv_nsec > (1000*1000*1000)) {
546             sec += end.tv_nsec / (1000*1000*1000);
547             end.tv_nsec = end.tv_nsec % (1000*1000*1000);
548         }
549         rv = pthread_cond_timedwait(cond, mtx, &end);
550     }
551     return rv;
552 }
553
554 void *
555 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
556 {
557     char *outptr;
558     size_t len;
559
560     /* Lock the condition variable while we manipulate nmsg. */
561     pthread_mutex_lock(&q->mtx);
562     if (q->nbytes == 0) {
563         /* Unlock the condition variable and wait for a signal. */
564         if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) {
565             /* Timed out or something... */
566             pthread_mutex_unlock(&q->mtx);
567             return NULL;
568         }
569         /* Condition variable is now locked again. */
570     }
571     /* Get the message size. */
572     if (q->out + sizeof(q->msglen) > q->depth) {
573         /* Handle annoying wraparound case. */
574         size_t len1 = q->depth - q->out;
575         memcpy(&q->msglen, q->data + q->out, len1);
576         memcpy(((char *)&q->msglen) + len1, q->data,
577                sizeof(q->msglen) - len1);
578         q->out = sizeof(q->msglen) - len1;
579     }
580     else {
581         memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
582         q->out += sizeof(q->msglen);
583     }
584     q->nbytes -= sizeof(q->msglen);
585     /* Get the message body. */
586     outptr = q->msg;
587     len = q->msglen;
588     if (q->out + q->msglen > q->depth) {
589         /* Handle wraparound. */
590         size_t len1 = q->depth - q->out;
591         memcpy(outptr, q->data + q->out, len1);
592         outptr += len1;
593         len -= len1;
594         q->nbytes -= len1;
595         q->out = 0;
596     }
597     memcpy(outptr, q->data + q->out, len);
598     q->nbytes -= len;
599     q->out += len;
600
601     /* Signal the condition variable. */
602     pthread_cond_signal(&q->cond);
603     /* Unlock the condition variable, we are done. */
604     pthread_mutex_unlock(&q->mtx);
605     if (out_len)
606         *out_len = q->msglen;
607     return q->msg;
608 }
609
610 sbevent_t *
611 sbevent_init(void)
612 {
613     sbevent_t *evt;
614     int rv;
615
616     evt = ckd_calloc(1, sizeof(*evt));
617     if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) {
618         E_ERROR("Failed to initialize mutex: %d\n", rv);
619         ckd_free(evt);
620         return NULL;
621     }
622     if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) {
623         E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv);
624         pthread_mutex_destroy(&evt->mtx);
625         ckd_free(evt);
626         return NULL;
627     }
628     return evt;
629 }
630
631 void
632 sbevent_free(sbevent_t *evt)
633 {
634     pthread_mutex_destroy(&evt->mtx);
635     pthread_cond_destroy(&evt->cond);
636     ckd_free(evt);
637 }
638
639 int
640 sbevent_signal(sbevent_t *evt)
641 {
642     int rv;
643
644     pthread_mutex_lock(&evt->mtx);
645     evt->signalled = TRUE;
646     rv = pthread_cond_signal(&evt->cond);
647     pthread_mutex_unlock(&evt->mtx);
648     return rv;
649 }
650
651 int
652 sbevent_wait(sbevent_t *evt, int sec, int nsec)
653 {
654     int rv = 0;
655
656     /* Lock the mutex before we check its signalled state. */
657     pthread_mutex_lock(&evt->mtx);
658     /* If it's not signalled, then wait until it is. */
659     if (!evt->signalled)
660         rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec);
661     /* Set its state to unsignalled if we were successful. */
662     if (rv == 0)
663         evt->signalled = FALSE;
664     /* And unlock its mutex. */
665     pthread_mutex_unlock(&evt->mtx);
666
667     return rv;
668 }
669
670 sbmtx_t *
671 sbmtx_init(void)
672 {
673     sbmtx_t *mtx;
674
675     mtx = ckd_calloc(1, sizeof(*mtx));
676     if (pthread_mutex_init(&mtx->mtx, NULL) != 0) {
677         ckd_free(mtx);
678         return NULL;
679     }
680     return mtx;
681 }
682
683 int
684 sbmtx_trylock(sbmtx_t *mtx)
685 {
686     return pthread_mutex_trylock(&mtx->mtx);
687 }
688
689 int
690 sbmtx_lock(sbmtx_t *mtx)
691 {
692     return pthread_mutex_lock(&mtx->mtx);
693 }
694
695 int
696 sbmtx_unlock(sbmtx_t *mtx)
697 {
698     return pthread_mutex_unlock(&mtx->mtx);
699 }
700
701 void
702 sbmtx_free(sbmtx_t *mtx)
703 {
704     pthread_mutex_destroy(&mtx->mtx);
705     ckd_free(mtx);
706 }
707 #endif /* not WIN32 */
708
709 cmd_ln_t *
710 sbthread_config(sbthread_t *th)
711 {
712     return th->config;
713 }
714
715 void *
716 sbthread_arg(sbthread_t *th)
717 {
718     return th->arg;
719 }
720
721 sbmsgq_t *
722 sbthread_msgq(sbthread_t *th)
723 {
724     return th->msgq;
725 }
726
727 int
728 sbthread_send(sbthread_t *th, size_t len, void const *data)
729 {
730     return sbmsgq_send(th->msgq, len, data);
731 }
732
733 void
734 sbthread_free(sbthread_t *th)
735 {
736     sbthread_wait(th);
737     sbmsgq_free(th->msgq);
738     ckd_free(th);
739 }