Lose all the stillborn code in rpmsq.[ch]
[platform/upstream/rpm.git] / rpmio / rpmsq.c
1 /** \ingroup rpmio
2  * \file rpmio/rpmsq.c
3  */
4
5 #include "system.h"
6
7 #include <signal.h>
8 #include <sys/signal.h>
9 #include <sys/wait.h>
10 #include <search.h>
11 #include <errno.h>
12 #include <stdio.h>
13
14 #if defined(HAVE_PTHREAD_H)
15
16 #include <pthread.h>
17
18 /* XXX suggested in bugzilla #159024 */
19 #if PTHREAD_MUTEX_DEFAULT != PTHREAD_MUTEX_NORMAL
20   #error RPM expects PTHREAD_MUTEX_DEFAULT == PTHREAD_MUTEX_NORMAL
21 #endif
22
23 #ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
24 static pthread_mutex_t rpmsigTbl_lock = PTHREAD_MUTEX_INITIALIZER;
25 #else
26 static pthread_mutex_t rpmsigTbl_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
27 #endif
28
29 #define DO_LOCK()       pthread_mutex_lock(&rpmsigTbl_lock);
30 #define DO_UNLOCK()     pthread_mutex_unlock(&rpmsigTbl_lock);
31 #define ADD_REF(__tbl)  (__tbl)->active++
32 #define SUB_REF(__tbl)  --(__tbl)->active
33
34 #define ME()    ((void *)pthread_self())
35
36 #else
37
38 #define DO_LOCK()
39 #define DO_UNLOCK()
40 #define ADD_REF(__tbl)  (0)
41 #define SUB_REF(__tbl)  (0)
42
43 #define ME()    (((void *)getpid()))
44
45 #endif  /* HAVE_PTHREAD_H */
46
47 #define _RPMSQ_INTERNAL
48 #include <rpm/rpmsq.h>
49
50 #include "debug.h"
51
52 #define _RPMSQ_DEBUG    0
53 int _rpmsq_debug = _RPMSQ_DEBUG;
54
55 static struct rpmsqElem rpmsqRock;
56
57 static rpmsq rpmsqQueue = &rpmsqRock;
58
59 /** \ingroup rpmsq
60  * Insert node into from queue.
61  * @param elem          node to link
62  * @param prev          previous node from queue
63  * @return              0 on success
64  */
65 static int rpmsqInsert(void * elem, void * prev)
66 {
67     rpmsq sq = (rpmsq) elem;
68     int ret = -1;
69
70     if (sq != NULL) {
71 #ifdef _RPMSQ_DEBUG
72 if (_rpmsq_debug)
73 fprintf(stderr, "    Insert(%p): %p\n", ME(), sq);
74 #endif
75         ret = sighold(SIGCHLD);
76         if (ret == 0) {
77             sq->child = 0;
78             sq->reaped = 0;
79             sq->status = 0;
80             sq->reaper = 1;
81             sq->pipes[0] = sq->pipes[1] = -1;
82
83             sq->id = ME();
84             ret = pthread_mutex_init(&sq->mutex, NULL);
85             insque(elem, (prev != NULL ? prev : rpmsqQueue));
86             ret = sigrelse(SIGCHLD);
87         }
88     }
89     return ret;
90 }
91
92 /** \ingroup rpmsq
93  * Remove node from queue.
94  * @param elem          node to link
95  * @return              0 on success
96  */
97 static int rpmsqRemove(void * elem)
98 {
99     rpmsq sq = (rpmsq) elem;
100     int ret = -1;
101
102     if (elem != NULL) {
103
104 #ifdef _RPMSQ_DEBUG
105 if (_rpmsq_debug)
106 fprintf(stderr, "    Remove(%p): %p\n", ME(), sq);
107 #endif
108         ret = sighold (SIGCHLD);
109         if (ret == 0) {
110             remque(elem);
111            
112             /* Unlock the mutex and then destroy it */ 
113             if((ret = pthread_mutex_unlock(&sq->mutex)) == 0)
114                 ret = pthread_mutex_destroy(&sq->mutex);
115
116             sq->id = NULL;
117             if (sq->pipes[1])   ret = close(sq->pipes[1]);
118             if (sq->pipes[0])   ret = close(sq->pipes[0]);
119             sq->pipes[0] = sq->pipes[1] = -1;
120 #ifdef  NOTYET  /* rpmpsmWait debugging message needs */
121             sq->reaper = 1;
122             sq->status = 0;
123             sq->reaped = 0;
124             sq->child = 0;
125 #endif
126             ret = sigrelse(SIGCHLD);
127         }
128     }
129     return ret;
130 }
131
132 static sigset_t rpmsqCaught;
133
134 static struct rpmsig_s {
135     int signum;
136     rpmsqAction_t handler;
137     int active;
138     struct sigaction oact;
139 } rpmsigTbl[] = {
140     { SIGINT,   rpmsqAction },
141 #define rpmsigTbl_sigint        (&rpmsigTbl[0])
142     { SIGQUIT,  rpmsqAction },
143 #define rpmsigTbl_sigquit       (&rpmsigTbl[1])
144     { SIGCHLD,  rpmsqAction },
145 #define rpmsigTbl_sigchld       (&rpmsigTbl[2])
146     { SIGHUP,   rpmsqAction },
147 #define rpmsigTbl_sighup        (&rpmsigTbl[3])
148     { SIGTERM,  rpmsqAction },
149 #define rpmsigTbl_sigterm       (&rpmsigTbl[4])
150     { SIGPIPE,  rpmsqAction },
151 #define rpmsigTbl_sigpipe       (&rpmsigTbl[5])
152     { -1,       NULL },
153 };
154
155 int rpmsqIsCaught(int signum)
156 {
157     return sigismember(&rpmsqCaught, signum);
158 }
159
160 #ifdef SA_SIGINFO
161 void rpmsqAction(int signum, siginfo_t * info, void * context)
162 #else
163 void rpmsqAction(int signum)
164 #endif
165 {
166     int save = errno;
167     rpmsig tbl;
168
169     for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) {
170         if (tbl->signum != signum)
171             continue;
172
173         (void) sigaddset(&rpmsqCaught, signum);
174
175         switch (signum) {
176         case SIGCHLD:
177             while (1) {
178                 rpmsq sq;
179                 int status = 0;
180                 pid_t reaped = waitpid(0, &status, WNOHANG);
181
182                 /* XXX errno set to ECHILD/EINVAL/EINTR. */
183                 if (reaped <= 0)
184                     break;
185
186                 /* XXX insque(3)/remque(3) are dequeue, not ring. */
187                 for (sq = rpmsqQueue->q_forw;
188                      sq != NULL && sq != rpmsqQueue;
189                      sq = sq->q_forw)
190                 {
191                     int ret;
192
193                     if (sq->child != reaped)
194                         continue;
195                     sq->reaped = reaped;
196                     sq->status = status;
197
198                     /* Unlock the mutex.  The waiter will then be able to 
199                      * aquire the lock.  
200                      *
201                      * XXX: jbj, wtd, if this fails? 
202                      */
203                     ret = pthread_mutex_unlock(&sq->mutex); 
204
205                     break;
206                 }
207             }
208             break;
209         default:
210             break;
211         }
212         break;
213     }
214     errno = save;
215 }
216
217 int rpmsqEnable(int signum, rpmsqAction_t handler)
218 {
219     int tblsignum = (signum >= 0 ? signum : -signum);
220     struct sigaction sa;
221     rpmsig tbl;
222     int ret = -1;
223
224     (void) DO_LOCK ();
225     if (rpmsqQueue->id == NULL)
226         rpmsqQueue->id = ME();
227     for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) {
228         if (tblsignum != tbl->signum)
229             continue;
230
231         if (signum >= 0) {                      /* Enable. */
232             if (ADD_REF(tbl) <= 0) {
233                 (void) sigdelset(&rpmsqCaught, tbl->signum);
234
235                 /* XXX Don't set a signal handler if already SIG_IGN */
236                 (void) sigaction(tbl->signum, NULL, &tbl->oact);
237                 if (tbl->oact.sa_handler == SIG_IGN)
238                     continue;
239
240                 (void) sigemptyset (&sa.sa_mask);
241 #ifdef SA_SIGINFO
242                 sa.sa_flags = SA_SIGINFO;
243 #else
244                 sa.sa_flags = 0;
245 #endif
246                 sa.sa_sigaction = (handler != NULL ? handler : tbl->handler);
247                 if (sigaction(tbl->signum, &sa, &tbl->oact) < 0) {
248                     SUB_REF(tbl);
249                     break;
250                 }
251                 tbl->active = 1;                /* XXX just in case */
252                 if (handler != NULL)
253                     tbl->handler = handler;
254             }
255         } else {                                /* Disable. */
256             if (SUB_REF(tbl) <= 0) {
257                 if (sigaction(tbl->signum, &tbl->oact, NULL) < 0)
258                     break;
259                 tbl->active = 0;                /* XXX just in case */
260                 tbl->handler = (handler != NULL ? handler : rpmsqAction);
261             }
262         }
263         ret = tbl->active;
264         break;
265     }
266     (void) DO_UNLOCK ();
267     return ret;
268 }
269
270 pid_t rpmsqFork(rpmsq sq)
271 {
272     pid_t pid;
273     int xx;
274     int nothreads = 0;   /* XXX: Shouldn't this be a global? */
275
276     if (sq->reaper) {
277         xx = rpmsqInsert(sq, NULL);
278 #ifdef _RPMSQ_DEBUG
279 if (_rpmsq_debug)
280 fprintf(stderr, "    Enable(%p): %p\n", ME(), sq);
281 #endif
282         xx = rpmsqEnable(SIGCHLD, NULL);
283     }
284
285     xx = pipe(sq->pipes);
286
287     xx = sighold(SIGCHLD);
288
289     /* 
290      * Initialize the cond var mutex.   We have to aquire the lock we 
291      * use for the condition before we fork.  Otherwise it is possible for
292      * the child to exit, we get sigchild and the sig handler to send 
293      * the condition signal before we are waiting on the condition.
294      */
295     if (!nothreads) {
296         if(pthread_mutex_lock(&sq->mutex)) {
297             /* Yack we did not get the lock, lets just give up */
298             xx = close(sq->pipes[0]);
299             xx = close(sq->pipes[1]);
300             sq->pipes[0] = sq->pipes[1] = -1;
301             goto out;
302         }
303     }
304
305     pid = fork();
306     if (pid < (pid_t) 0) {              /* fork failed.  */
307         sq->child = (pid_t)-1;
308         xx = close(sq->pipes[0]);
309         xx = close(sq->pipes[1]);
310         sq->pipes[0] = sq->pipes[1] = -1;
311         goto out;
312     } else if (pid == (pid_t) 0) {      /* Child. */
313         int yy;
314
315         /* Block to permit parent time to wait. */
316         xx = close(sq->pipes[1]);
317         xx = read(sq->pipes[0], &yy, sizeof(yy));
318         xx = close(sq->pipes[0]);
319         sq->pipes[0] = sq->pipes[1] = -1;
320
321 #ifdef _RPMSQ_DEBUG
322 if (_rpmsq_debug)
323 fprintf(stderr, "     Child(%p): %p child %d\n", ME(), sq, getpid());
324 #endif
325
326     } else {                            /* Parent. */
327
328         sq->child = pid;
329
330 #ifdef _RPMSQ_DEBUG
331 if (_rpmsq_debug)
332 fprintf(stderr, "    Parent(%p): %p child %d\n", ME(), sq, sq->child);
333 #endif
334
335     }
336
337 out:
338     xx = sigrelse(SIGCHLD);
339     return sq->child;
340 }
341
342 /**
343  * Wait for child process to be reaped, and unregister SIGCHLD handler.
344  * @todo Rewrite to use waitpid on helper thread.
345  * @param sq            scriptlet queue element
346  * @return              0 on success
347  */
348 static int rpmsqWaitUnregister(rpmsq sq)
349 {
350     int nothreads = 0;
351     int ret = 0;
352     int xx;
353
354     /* Protect sq->reaped from handler changes. */
355     ret = sighold(SIGCHLD);
356
357     /* Start the child, linux often runs child before parent. */
358     if (sq->pipes[0] >= 0)
359         xx = close(sq->pipes[0]);
360     if (sq->pipes[1] >= 0)
361         xx = close(sq->pipes[1]);
362     sq->pipes[0] = sq->pipes[1] = -1;
363
364     /* Put a stopwatch on the time spent waiting to measure performance gain. */
365     (void) rpmswEnter(&sq->op, -1);
366
367     /* Wait for handler to receive SIGCHLD. */
368     while (ret == 0 && sq->reaped != sq->child) {
369         if (nothreads)
370             /* Note that sigpause re-enables SIGCHLD. */
371             ret = sigpause(SIGCHLD);
372         else {
373             xx = sigrelse(SIGCHLD);
374             
375             /* 
376              * We start before the fork with this mutex locked;
377              * The only one that unlocks this the signal handler.
378              * So if we get the lock the child has been reaped.
379              */
380             ret = pthread_mutex_lock(&sq->mutex);
381             xx = sighold(SIGCHLD);
382         }
383     }
384
385     /* Accumulate stopwatch time spent waiting, potential performance gain. */
386     sq->ms_scriptlets += rpmswExit(&sq->op, -1)/1000;
387
388     xx = sigrelse(SIGCHLD);
389
390 #ifdef _RPMSQ_DEBUG
391 if (_rpmsq_debug)
392 fprintf(stderr, "      Wake(%p): %p child %d reaper %d ret %d\n", ME(), sq, sq->child, sq->reaper, ret);
393 #endif
394
395     /* Remove processed SIGCHLD item from queue. */
396     xx = rpmsqRemove(sq);
397
398     /* Disable SIGCHLD handler on refcount == 0. */
399     xx = rpmsqEnable(-SIGCHLD, NULL);
400 #ifdef _RPMSQ_DEBUG
401 if (_rpmsq_debug)
402 fprintf(stderr, "   Disable(%p): %p\n", ME(), sq);
403 #endif
404
405     return ret;
406 }
407
408 pid_t rpmsqWait(rpmsq sq)
409 {
410
411 #ifdef _RPMSQ_DEBUG
412 if (_rpmsq_debug)
413 fprintf(stderr, "      Wait(%p): %p child %d reaper %d\n", ME(), sq, sq->child, sq->reaper);
414 #endif
415
416     if (sq->reaper) {
417         (void) rpmsqWaitUnregister(sq);
418     } else {
419         pid_t reaped;
420         int status;
421         do {
422             reaped = waitpid(sq->child, &status, 0);
423         } while (reaped >= 0 && reaped != sq->child);
424         sq->reaped = reaped;
425         sq->status = status;
426 #ifdef _RPMSQ_DEBUG
427 if (_rpmsq_debug)
428 fprintf(stderr, "   Waitpid(%p): %p child %d reaped %d\n", ME(), sq, sq->child, sq->reaped);
429 #endif
430     }
431
432 #ifdef _RPMSQ_DEBUG
433 if (_rpmsq_debug)
434 fprintf(stderr, "      Fini(%p): %p child %d status 0x%x\n", ME(), sq, sq->child, sq->status);
435 #endif
436
437     return sq->reaped;
438 }