Tizen 2.1 base
[framework/base/fuse.git] / lib / fuse_loop_mt.c
1 /*
2   FUSE: Filesystem in Userspace
3   Copyright (C) 2001-2007  Miklos Szeredi <miklos@szeredi.hu>
4
5   This program can be distributed under the terms of the GNU LGPLv2.
6   See the file COPYING.LIB.
7 */
8
9 #include "fuse_lowlevel.h"
10 #include "fuse_misc.h"
11 #include "fuse_kernel.h"
12 #include "fuse_i.h"
13
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <unistd.h>
18 #include <signal.h>
19 #include <semaphore.h>
20 #include <errno.h>
21 #include <sys/time.h>
22
23 /* Environment var controlling the thread stack size */
24 #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
25
26 struct fuse_worker {
27         struct fuse_worker *prev;
28         struct fuse_worker *next;
29         pthread_t thread_id;
30         size_t bufsize;
31         char *buf;
32         struct fuse_mt *mt;
33 };
34
35 struct fuse_mt {
36         pthread_mutex_t lock;
37         int numworker;
38         int numavail;
39         struct fuse_session *se;
40         struct fuse_chan *prevch;
41         struct fuse_worker main;
42         sem_t finish;
43         int exit;
44         int error;
45 };
46
47 static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next)
48 {
49         struct fuse_worker *prev = next->prev;
50         w->next = next;
51         w->prev = prev;
52         prev->next = w;
53         next->prev = w;
54 }
55
56 static void list_del_worker(struct fuse_worker *w)
57 {
58         struct fuse_worker *prev = w->prev;
59         struct fuse_worker *next = w->next;
60         prev->next = next;
61         next->prev = prev;
62 }
63
64 static int fuse_loop_start_thread(struct fuse_mt *mt);
65
66 static void *fuse_do_work(void *data)
67 {
68         struct fuse_worker *w = (struct fuse_worker *) data;
69         struct fuse_mt *mt = w->mt;
70
71         while (!fuse_session_exited(mt->se)) {
72                 int isforget = 0;
73                 struct fuse_chan *ch = mt->prevch;
74                 struct fuse_buf fbuf = {
75                         .mem = w->buf,
76                         .size = w->bufsize,
77                 };
78                 int res;
79
80                 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
81                 res = fuse_session_receive_buf(mt->se, &fbuf, &ch);
82                 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
83                 if (res == -EINTR)
84                         continue;
85                 if (res <= 0) {
86                         if (res < 0) {
87                                 fuse_session_exit(mt->se);
88                                 mt->error = -1;
89                         }
90                         break;
91                 }
92
93                 pthread_mutex_lock(&mt->lock);
94                 if (mt->exit) {
95                         pthread_mutex_unlock(&mt->lock);
96                         return NULL;
97                 }
98
99                 /*
100                  * This disgusting hack is needed so that zillions of threads
101                  * are not created on a burst of FORGET messages
102                  */
103                 if (!(fbuf.flags & FUSE_BUF_IS_FD)) {
104                         struct fuse_in_header *in = fbuf.mem;
105
106                         if (in->opcode == FUSE_FORGET ||
107                             in->opcode == FUSE_BATCH_FORGET)
108                                 isforget = 1;
109                 }
110
111                 if (!isforget)
112                         mt->numavail--;
113                 if (mt->numavail == 0)
114                         fuse_loop_start_thread(mt);
115                 pthread_mutex_unlock(&mt->lock);
116
117                 fuse_session_process_buf(mt->se, &fbuf, ch);
118
119                 pthread_mutex_lock(&mt->lock);
120                 if (!isforget)
121                         mt->numavail++;
122                 if (mt->numavail > 10) {
123                         if (mt->exit) {
124                                 pthread_mutex_unlock(&mt->lock);
125                                 return NULL;
126                         }
127                         list_del_worker(w);
128                         mt->numavail--;
129                         mt->numworker--;
130                         pthread_mutex_unlock(&mt->lock);
131
132                         pthread_detach(w->thread_id);
133                         free(w->buf);
134                         free(w);
135                         return NULL;
136                 }
137                 pthread_mutex_unlock(&mt->lock);
138         }
139
140         sem_post(&mt->finish);
141
142         return NULL;
143 }
144
145 int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
146 {
147         sigset_t oldset;
148         sigset_t newset;
149         int res;
150         pthread_attr_t attr;
151         char *stack_size;
152
153         /* Override default stack size */
154         pthread_attr_init(&attr);
155         stack_size = getenv(ENVNAME_THREAD_STACK);
156         if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
157                 fprintf(stderr, "fuse: invalid stack size: %s\n", stack_size);
158
159         /* Disallow signal reception in worker threads */
160         sigemptyset(&newset);
161         sigaddset(&newset, SIGTERM);
162         sigaddset(&newset, SIGINT);
163         sigaddset(&newset, SIGHUP);
164         sigaddset(&newset, SIGQUIT);
165         pthread_sigmask(SIG_BLOCK, &newset, &oldset);
166         res = pthread_create(thread_id, &attr, func, arg);
167         pthread_sigmask(SIG_SETMASK, &oldset, NULL);
168         pthread_attr_destroy(&attr);
169         if (res != 0) {
170                 fprintf(stderr, "fuse: error creating thread: %s\n",
171                         strerror(res));
172                 return -1;
173         }
174
175         return 0;
176 }
177
178 static int fuse_loop_start_thread(struct fuse_mt *mt)
179 {
180         int res;
181         struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
182         if (!w) {
183                 fprintf(stderr, "fuse: failed to allocate worker structure\n");
184                 return -1;
185         }
186         memset(w, 0, sizeof(struct fuse_worker));
187         w->bufsize = fuse_chan_bufsize(mt->prevch);
188         w->buf = malloc(w->bufsize);
189         w->mt = mt;
190         if (!w->buf) {
191                 fprintf(stderr, "fuse: failed to allocate read buffer\n");
192                 free(w);
193                 return -1;
194         }
195
196         res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
197         if (res == -1) {
198                 free(w->buf);
199                 free(w);
200                 return -1;
201         }
202         list_add_worker(w, &mt->main);
203         mt->numavail ++;
204         mt->numworker ++;
205
206         return 0;
207 }
208
209 static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
210 {
211         pthread_join(w->thread_id, NULL);
212         pthread_mutex_lock(&mt->lock);
213         list_del_worker(w);
214         pthread_mutex_unlock(&mt->lock);
215         free(w->buf);
216         free(w);
217 }
218
219 int fuse_session_loop_mt(struct fuse_session *se)
220 {
221         int err;
222         struct fuse_mt mt;
223         struct fuse_worker *w;
224
225         memset(&mt, 0, sizeof(struct fuse_mt));
226         mt.se = se;
227         mt.prevch = fuse_session_next_chan(se, NULL);
228         mt.error = 0;
229         mt.numworker = 0;
230         mt.numavail = 0;
231         mt.main.thread_id = pthread_self();
232         mt.main.prev = mt.main.next = &mt.main;
233         sem_init(&mt.finish, 0, 0);
234         fuse_mutex_init(&mt.lock);
235
236         pthread_mutex_lock(&mt.lock);
237         err = fuse_loop_start_thread(&mt);
238         pthread_mutex_unlock(&mt.lock);
239         if (!err) {
240                 /* sem_wait() is interruptible */
241                 while (!fuse_session_exited(se))
242                         sem_wait(&mt.finish);
243
244                 for (w = mt.main.next; w != &mt.main; w = w->next)
245                         pthread_cancel(w->thread_id);
246                 mt.exit = 1;
247
248                 while (mt.main.next != &mt.main)
249                         fuse_join_worker(&mt, mt.main.next);
250
251                 err = mt.error;
252         }
253
254         pthread_mutex_destroy(&mt.lock);
255         sem_destroy(&mt.finish);
256         fuse_session_reset(se);
257         return err;
258 }