Proof of concept scriptlet queue, single or multi threaded.
authorjbj <devnull@localhost>
Tue, 18 Mar 2003 02:41:33 +0000 (02:41 +0000)
committerjbj <devnull@localhost>
Tue, 18 Mar 2003 02:41:33 +0000 (02:41 +0000)
CVS patchset: 6699
CVS date: 2003/03/18 02:41:33

lib/psm.c
lib/psm.h
lib/tthread.c
rpmio/rpmsq.c
rpmio/rpmsq.h

index e0b1dbb..5e5979d 100644 (file)
--- a/lib/psm.c
+++ b/lib/psm.c
@@ -9,7 +9,6 @@
 #include <rpmlib.h>
 #include <rpmmacro.h>
 #include <rpmurl.h>
-#include <rpmsq.h>
 
 #include "cpio.h"
 #include "fsm.h"               /* XXX CPIO_FOO/FSM_FOO constants */
@@ -456,77 +455,6 @@ static /*@observer@*/ const char * const tag2sln(int tag)
 }
 
 /**
- * Register a child reaper, then fork a child.
- * @param psm          package state machine data
- * @return             fork(2) pid
- */
-static pid_t psmRegisterFork(rpmpsm psm)
-       /*@globals fileSystem, internalState @*/
-       /*@modifies psm, fileSystem, internalState @*/
-{
-    sigset_t newMask, oldMask;
-
-    (void) sigfillset(&newMask);               /* block all signals */
-    (void) sigprocmask(SIG_BLOCK, &newMask, &oldMask);
-
-  if (psm->reaper) {
-    Insque(psm, NULL);
-/*@-modfilesys@*/
-if (_psm_debug)
-fprintf(stderr, "  Register: %p\n", psm);
-/*@=modfilesys@*/
-
-    (void) rpmsqEnable(SIGCHLD, NULL);
-  }
-
-    psm->reaped = 0;
-    if ((psm->child = fork()) != 0) {
-/*@-modfilesys@*/
-if (_psm_debug)
-fprintf(stderr, "      Fork: %p child %d\n", psm, psm->child);
-/*@=modfilesys@*/
-    }
-
-    (void) sigprocmask(SIG_SETMASK, &oldMask, NULL);
-
-    return psm->child;
-}
-
-/**
- * Unregister a child reaper.
- */
-static int psmWaitUnregister(rpmpsm psm, pid_t child)
-       /*@globals fileSystem, internalState @*/
-       /*@modifies fileSystem, internalState @*/
-{
-    sigset_t newMask, oldMask;
-
-    (void) sigfillset(&newMask);               /* block all signals */
-    (void) sigprocmask(SIG_BLOCK, &newMask, &oldMask);
-    
-    /*@-infloops@*/
-    while (psm->reaped != psm->child)
-       (void) sigsuspend(&oldMask);
-    /*@=infloops@*/
-
-/*@-modfilesys@*/
-if (_psm_debug)
-fprintf(stderr, "      Wait: %p child %d\n", psm, psm->child);
-/*@=modfilesys@*/
-
-    if (psm->reaper) {
-       Remque(psm);
-       (void) rpmsqEnable(-SIGCHLD, NULL);
-/*@-modfilesys@*/
-if (_psm_debug)
-fprintf(stderr, "Unregister: %p child %d\n", psm, child);
-/*@=modfilesys@*/
-    }
-
-    return sigprocmask(SIG_SETMASK, &oldMask, NULL);
-}
-
-/**
  * Wait for child process to be reaped.
  * @param psm          package state machine data
  * @return             
@@ -535,19 +463,13 @@ static pid_t psmWait(rpmpsm psm)
        /*@globals fileSystem, internalState @*/
        /*@modifies psm, fileSystem, internalState @*/
 {
-    if (psm->reaper) {
-       (void) psmWaitUnregister(psm, psm->child);
-    } else {
-       do {
-           psm->reaped = waitpid(psm->child, &psm->status, 0);
-       } while (psm->reaped >= 0 && psm->reaped != psm->child);
-    }
+    (void) rpmsqWait(&psm->sq);
 
     rpmMessage(RPMMESS_DEBUG, _("%s: waitpid(%d) rc %d status %x\n"),
-       psm->stepName, (unsigned)psm->child,
-       (unsigned)psm->reaped, psm->status);
+       psm->stepName, (unsigned)psm->sq.child,
+       (unsigned)psm->sq.reaped, psm->sq.status);
 
-    return psm->reaped;
+    return psm->sq.reaped;
 }
 
 /**
@@ -598,7 +520,8 @@ static rpmRC runScript(rpmpsm psm, Header h, const char * sln,
     int len;
     char * prefixBuf = NULL;
     const char * fn = NULL;
-    int i, xx;
+    int xx;
+    int i;
     int freePrefixes = 0;
     FD_t scriptFd;
     FD_t out;
@@ -608,10 +531,10 @@ static rpmRC runScript(rpmpsm psm, Header h, const char * sln,
     if (progArgv == NULL && script == NULL)
        return rc;
 
-    psm->child = 0;
-    psm->reaped = 0;
-    psm->status = 0;
-    psm->reaper = 0;
+    psm->sq.child = 0;
+    psm->sq.reaped = 0;
+    psm->sq.status = 0;
+    psm->sq.reaper = 1;
 
     /* XXX FIXME: except for %verifyscript, rpmteNEVR can be used. */
     xx = headerNVR(h, &n, &v, &r);
@@ -730,8 +653,8 @@ static rpmRC runScript(rpmpsm psm, Header h, const char * sln,
     if (out == NULL) return RPMRC_FAIL;        /* XXX can't happen */
     
     /*@-branchstate@*/
-    (void) psmRegisterFork(psm);
-    if (psm->child == 0) {
+    xx = rpmsqFork(&psm->sq);
+    if (psm->sq.child == 0) {
        const char * rootDir;
        int pipes[2];
 
@@ -818,16 +741,16 @@ fprintf(stderr, "      Exec: %s \"%s\"\n", sln, argv[0]);
 
     (void) psmWait(psm);
 
-    if (psm->reaped < 0) {
+    if (psm->sq.reaped < 0) {
        rpmError(RPMERR_SCRIPT,
                _("%s(%s-%s-%s) scriptlet failed, waitpid(%d) rc %d: %s\n"),
-                sln, n, v, r, psm->child, psm->reaped, strerror(errno));
+                sln, n, v, r, psm->sq.child, psm->sq.reaped, strerror(errno));
        rc = RPMRC_FAIL;
     } else
-    if (!WIFEXITED(psm->status) || WEXITSTATUS(psm->status)) {
+    if (!WIFEXITED(psm->sq.status) || WEXITSTATUS(psm->sq.status)) {
        rpmError(RPMERR_SCRIPT,
                _("%s(%s-%s-%s) scriptlet failed, exit status %d\n"),
-               sln, n, v, r, WEXITSTATUS(psm->status));
+               sln, n, v, r, WEXITSTATUS(psm->sq.status));
        rc = RPMRC_FAIL;
     }
 
index 1f847e1..2c5e502 100644 (file)
--- a/lib/psm.h
+++ b/lib/psm.h
@@ -6,6 +6,8 @@
  * Package state machine to handle a package from a transaction set.
  */
 
+#include <rpmsq.h>
+
 /*@-exportlocal@*/
 /*@unchecked@*/
 extern int _psm_debug;
@@ -60,11 +62,7 @@ typedef enum pkgStage_e {
 /**
  */
 struct rpmpsm_s {
-    void * q_forw;             /*!< for use by insque(3)/remque(3). */
-    void * q_back;
-    pid_t child;               /*!< Currently running process. */
-    pid_t reaped;              /*!< Reaped waitpid return. */
-    int status;                        /*!< Reaped waitpid status. */
+    struct rpmsqElem sq;       /*!< Scriptlet/signal queue element. */
 
 /*@refcounted@*/
     rpmts ts;                  /*!< transaction set */
@@ -95,7 +93,6 @@ struct rpmpsm_s {
     int countCorrection;       /*!< 0 if installing, -1 if removing. */
     int chrootDone;            /*!< Was chroot(2) done by pkgStage? */
     int unorderedSuccessor;    /*!< Can the PSM be run asynchronously? */
-    int reaper;                        /*!< Register SIGCHLD handler? */
     rpmCallbackType what;      /*!< Callback type. */
     unsigned long amount;      /*!< Callback amount. */
     unsigned long total;       /*!< Callback total. */
index 4adb5b6..3f9cc09 100644 (file)
@@ -1,12 +1,14 @@
-#include <stdio.h>
-#include <unistd.h>
-#include <fcntl.h>
+#include "system.h"
+
 #include <pthread.h>
 #include <assert.h>
 #include "rpmlib.h"
 #include "rpmts.h"
+#include "rpmsq.h"     /* XXX for _rpmsq_debug */
 #include "rpmio.h"
 
+#include "debug.h"
+
 extern int _psm_debug;
 
 static void *other_notify(const void *h,
@@ -16,67 +18,74 @@ static void *other_notify(const void *h,
                          fnpyKey key,
                          rpmCallbackData data)
 {
-  printf("notify %d %ld %ld\n", what, amount, total);
-
-  if(what == RPMCALLBACK_INST_OPEN_FILE)
-    return Fopen(key, "r");
-
-  return NULL;
+    static FD_t fd;
+
+    fprintf(stderr, "notify %d %ld %ld\n", what, amount, total);
+
+    switch (what) {
+    case RPMCALLBACK_INST_OPEN_FILE:
+       fd = Fopen(key, "r");
+       return fd;
+       break;
+    case RPMCALLBACK_INST_CLOSE_FILE:
+       if (fd != NULL) {
+           (void) Fclose(fd);
+           fd = NULL;
+       }
+       break;
+    default:
+       break;
+    }
+    return NULL;
 }
 
 static void *
 other_thread(void *dat)
 {
-  rpmts ts;
-  int fd, err;
-  FD_t fdt;
-  Header h = NULL;
-
-  rpmReadConfigFiles(NULL, NULL);
-  ts = rpmtsCreate();
-  assert(ts);
-  (void) rpmtsSetRootDir(ts, "/");
-
-  rpmIncreaseVerbosity();
-  rpmIncreaseVerbosity();
-
-  fd = open(dat, O_RDONLY);
-  assert(fd >= 0);
-  fdt = fdDup(fd);
-  rpmReadPackageFile(ts, fdt, "other_thread", &h);
-  Fclose(fdt);
-  close(fd);
-
-#if 0
-  err = rpmtsOpenDB(ts, O_RDWR);
-  assert(!err);
-#endif
-
-  err = rpmtsAddInstallElement(ts, h, dat, 1, NULL);
-
-  err = rpmtsSetNotifyCallback(ts, other_notify, NULL);
-  assert(!err);
-
-  err = rpmtsRun(ts, NULL, RPMPROB_FILTER_REPLACEPKG);
-  if(err)
-    printf("Run failed: %d\n", err);
-
-#if 0
-  err = rpmtsCloseDB(ts);
-  assert(!err);
-#endif
-
-  return NULL;
+    rpmts ts;
+    int err;
+    FD_t fd;
+    Header h = NULL;
+
+    rpmReadConfigFiles(NULL, NULL);
+    ts = rpmtsCreate();
+    assert(ts);
+    (void) rpmtsSetRootDir(ts, "/");
+
+    rpmIncreaseVerbosity();
+    rpmIncreaseVerbosity();
+
+    fd = Fopen(dat, "r.ufdio");
+    assert(fd != NULL);
+    rpmReadPackageFile(ts, fd, "other_thread", &h);
+    Fclose(fd);
+
+    err = rpmtsAddInstallElement(ts, h, dat, 1, NULL);
+
+    err = rpmtsSetNotifyCallback(ts, other_notify, NULL);
+    assert(!err);
+
+    err = rpmtsRun(ts, NULL, RPMPROB_FILTER_REPLACEPKG);
+    if(err)
+       fprintf(stderr, "Run failed: %d\n", err);
+
+    return NULL;
 }
 
 int main(int argc, char *argv[])
 {
-  pthread_t pth;
+    pthread_t pth;
+
+    _psm_debug = 1;
+    _rpmsq_debug = 1;
+
+    rpmsqEnable(SIGINT, NULL);
+    rpmsqEnable(SIGQUIT, NULL);
+    rpmsqEnable(SIGCHLD, NULL);
 
-  _psm_debug = 1;
-  pthread_create(&pth, NULL, other_thread, argv[1]);
+    pthread_create(&pth, NULL, other_thread, argv[1]);
 
-  pthread_join(pth, NULL);
+    pthread_join(pth, NULL);
 
-  return 0;
+    return 0;
 }
index 52180b6..15f48fe 100644 (file)
 #include "system.h"
                                                                                 
 #if defined(HAVE_PTHREAD_H) && !defined(__LCLINT__)
+
 #include <pthread.h>
-#endif
+
+#define        DO_LOCK()       pthread_mutex_lock(&rpmsigTbl_lock);
+#define        DO_UNLOCK()     pthread_mutex_unlock(&rpmsigTbl_lock);
+#define        INIT_LOCK()     \
+     { pthread_mutexattr_t attr; \
+       pthread_mutexattr_init(&attr); \
+       pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); \
+       pthread_mutex_init (&rpmsigTbl_lock, &attr); \
+       pthread_mutexattr_destroy(&attr); \
+       rpmsigTbl_sigchld->active = 0; \
+     }
+#define        ADD_REF(__tbl)  (__tbl)->active++
+#define        SUB_REF(__tbl)  --(__tbl)->active
+#define        CLEANUP_HANDLER(__handler, __arg, __oldtypeptr) \
+       pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, (__oldtypeptr)); \
+       pthread_cleanup_push((__handler), (__arg));
+#define        CLEANUP_RESET(__execute, __oldtype) \
+       pthread_cleanup_pop(__execute); \
+       pthread_setcanceltype ((__oldtype), &(__oldtype));
+
+#define        SAME_THREAD(_a, _b)     pthread_equal(((pthread_t)_a), ((pthread_t)_b))
+
+#define        ME()    ((void *)pthread_self())
+
+#else
+
+#define        DO_LOCK()
+#define        DO_UNLOCK()
+#define        INIT_LOCK()
+#define        ADD_REF(__tbl)
+#define        SUB_REF(__tbl)
+#define        CLEANUP_HANDLER(__handler, __arg, __oldtypeptr)
+#define        CLEANUP_RESET(__execute, __oldtype)
+
+#define        SAME_THREAD(_a, _b)     (42)
+
+#define        ME()    (((void *))getpid())
+
+#endif /* HAVE_PTHREAD_H */
 
 #include <rpmsq.h>
 
 #include "debug.h"
 
+#define        _RPMSQ_DEBUG    0
+/*@unchecked@*/
+int _rpmsq_debug = _RPMSQ_DEBUG;
+
 /*@unchecked@*/
 static struct rpmsqElem rpmsqRock;
 /*@unchecked@*/
 rpmsq rpmsqQueue = &rpmsqRock;
 
-void Insque(void * elem, void * prev)
+int rpmsqInsert(void * elem, void * prev)
 {
-    if (elem != NULL)
-       insque(elem, (prev ? prev : rpmsqQueue));
+    sigset_t newMask, oldMask;
+    rpmsq sq = (rpmsq) elem;
+    int ret = -1;
+
+    if (sq != NULL) {
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "    Insert(%p): %p\n", ME(), sq);
+/*@=modfilesys@*/
+#endif
+       ret = sigemptyset (&newMask);
+       ret = sigaddset (&newMask, SIGCHLD);
+       ret = sigprocmask(SIG_BLOCK, &newMask, &oldMask);
+       if (ret == 0) {
+           sq->child = 0;
+           sq->reaped = 0;
+           sq->status = 0;
+
+           sq->id = ME();
+           (void) pthread_mutex_init(&sq->mutex, NULL);
+           (void) pthread_cond_init(&sq->cond, NULL);
+           insque(elem, (prev ? prev : rpmsqQueue));
+           ret = sigprocmask(SIG_SETMASK, &oldMask, NULL);
+       }
+    }
+    return 0;
 }
 
-void Remque(void * elem)
+int rpmsqRemove(void * elem)
 {
-    if (elem != NULL)
-       remque(elem);
+    sigset_t newMask, oldMask;
+    rpmsq sq = (rpmsq) elem;
+    int ret = -1;
+
+    if (elem != NULL) {
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "    Remove(%p): %p\n", ME(), sq);
+/*@=modfilesys@*/
+#endif
+       ret = sigemptyset (&newMask);
+       ret = sigaddset (&newMask, SIGCHLD);
+       ret = sigprocmask(SIG_BLOCK, &newMask, &oldMask);
+       if (ret == 0) {
+           remque(elem);
+           (void) pthread_cond_destroy(&sq->cond);
+           (void) pthread_mutex_destroy(&sq->mutex);
+           sq->id = NULL;
+           sq->child = 0;
+           sq->reaped = 0;
+           sq->status = 0;
+           ret = sigprocmask(SIG_SETMASK, &oldMask, NULL);
+       }
+    }
+    return ret;
 }
 
 /*@unchecked@*/
@@ -49,27 +142,6 @@ static struct rpmsig_s {
 #define        rpmsigTbl_sigquit       (&rpmsigTbl[1])
     { SIGCHLD, rpmsqHandler },
 #define        rpmsigTbl_sigchld       (&rpmsigTbl[2])
-
-#define        DO_LOCK()       pthread_mutex_lock(&rpmsigTbl_lock);
-#define        DO_UNLOCK()     pthread_mutex_unlock(&rpmsigTbl_lock);
-#define        INIT_LOCK()     \
-     { pthread_mutexattr_t attr; \
-       pthread_mutexattr_init(&attr); \
-       pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); \
-       pthread_mutex_init (&rpmsigTbl_lock, &attr); \
-       pthread_mutexattr_destroy(&attr); \
-       rpmsigTbl_sigchld->active = 0; \
-     }
-#define        ADD_REF(__tbl)  (__tbl)->active++
-#define        SUB_REF(__tbl)  --(__tbl)->active
-
-#define        CLEANUP_HANDLER(__handler, __arg, __oldtypeptr) \
-       pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, (__oldtypeptr)); \
-       pthread_cleanup_push((__handler), (__arg));
-#define        CLEANUP_RESET(__execute, __oldtype) \
-       pthread_cleanup_pop(__execute); \
-       pthread_setcanceltype ((__oldtype), &(__oldtype));
-
     { SIGHUP,  rpmsqHandler },
 #define        rpmsigTbl_sighup        (&rpmsigTbl[3])
     { SIGTERM, rpmsqHandler },
@@ -80,11 +152,10 @@ static struct rpmsig_s {
 };
 /*@=fullinitblock@*/
 
-/**
- */
 /*@-incondefs@*/
 void rpmsqHandler(int signum)
 {
+    int save = errno;
     rpmsig tbl;
 
     for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) {
@@ -100,17 +171,42 @@ void rpmsqHandler(int signum)
                int status = 0;
                pid_t reaped = waitpid(0, &status, WNOHANG);
 
+               /* XXX errno set to ECHILD/EINVAL/EINTR. */
                if (reaped <= 0)
                    /*@innerbreak@*/ break;
 
+               /* XXX insque(3)/remque(3) are dequeue, not ring. */
                for (sq = rpmsqQueue->q_forw;
                     sq != NULL && sq != rpmsqQueue;
                     sq = sq->q_forw)
                {
+                   int same_thread;
                    if (sq->child != reaped)
                        /*@innercontinue@*/ continue;
+                   same_thread = SAME_THREAD(ME(), rpmsqQueue->id);
+#ifdef _RPMSQ_DEBUG_XXX
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "      Reap(%p): %p child %d id %p same %d\n", ME(), sq, sq->child, sq->id, same_thread);
+/*@=modfilesys@*/
+#endif
                    sq->reaped = reaped;
                    sq->status = status;
+
+#ifdef HACK
+                   if (!SAME_THREAD(ME(), sq->id))
+#endif
+                   {
+
+#ifdef _RPMSQ_DEBUG_XXX
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "    Signal(%p): %p child %d id %p\n", ME(), sq, sq->child, sq->id);
+/*@=modfilesys@*/
+#endif
+                       (void) pthread_cond_signal(&sq->cond);
+                   }
+
                    /*@innerbreak@*/ break;
                }
            }
@@ -120,18 +216,11 @@ void rpmsqHandler(int signum)
        }
        break;
     }
+    errno = save;
 }
 /*@=incondefs@*/
 
-/**
- * Enable or disable a signal handler.
- * @param signum       signal to enable (or disable if negative)
- * @param handler      signal handler (or NULL to use rpmsqHandler())
- * @return             no. of refs, -1 on error
- */
 int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler)
-       /*@globals rpmsqCaught, rpmsigTbl @*/
-       /*@modifies rpmsqCaught, rpmsigTbl @*/
 {
     int tblsignum = (signum >= 0 ? signum : -signum);
     struct sigaction sa;
@@ -139,6 +228,8 @@ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler)
     int ret = -1;
 
     DO_LOCK ();
+    if (rpmsqQueue->id == NULL)
+       rpmsqQueue->id = ME();
     for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) {
        if (tblsignum != tbl->signum)
            continue;
@@ -169,6 +260,172 @@ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler)
     return ret;
 }
 
+pid_t rpmsqFork(rpmsq sq)
+{
+    sigset_t newMask, oldMask;
+    pid_t pid;
+    int pipes[2];
+    int xx;
+
+    if (sq->reaper) {
+       xx = rpmsqInsert(sq, NULL);
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "    Enable(%p): %p\n", ME(), sq);
+/*@=modfilesys@*/
+#endif
+       xx = rpmsqEnable(SIGCHLD, NULL);
+    }
+
+    xx = pipe(pipes);
+
+    xx = sigemptyset (&newMask);
+    xx = sigaddset (&newMask, SIGCHLD);
+    xx = sigprocmask (SIG_BLOCK, &newMask, &oldMask);
+
+    pid = fork();
+    if (pid < (pid_t) 0) {             /* fork failed.  */
+       close(pipes[0]);
+       close(pipes[1]);
+       goto out;
+    } else if (pid == (pid_t) 0) {     /* Child. */
+       int yy;
+
+       /* Block to permit parent to wait. */
+       close(pipes[1]);
+       xx = read(pipes[0], &yy, sizeof(yy));
+       close(pipes[0]);
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "     Child(%p): %p child %d\n", ME(), sq, getpid());
+/*@=modfilesys@*/
+#endif
+
+    } else {                           /* Parent. */
+
+       sq->child = pid;
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "    Parent(%p): %p child %d\n", ME(), sq, sq->child);
+/*@=modfilesys@*/
+#endif
+
+       /* Unblock child. */
+       close(pipes[0]);
+       close(pipes[1]);
+
+    }
+
+out:
+    xx = sigprocmask (SIG_SETMASK, &oldMask, NULL);
+    return sq->child;
+}
+
+/**
+ * Wait for child process to be reaped, and unregister SIGCHLD handler.
+ * @param sq           scriptlet queue element
+ * @return             0 on success
+ */
+static int rpmsqWaitUnregister(rpmsq sq)
+       /*@globals fileSystem, internalState @*/
+       /*@modifies fileSystem, internalState @*/
+{
+    sigset_t newMask, oldMask;
+#ifdef HACK
+    int same_thread = SAME_THREAD(ME(), rpmsqQueue->id);
+#else
+    int same_thread = 0;
+#endif
+    int ret = 0;
+    int xx;
+
+    if (same_thread) {
+       ret = sigemptyset (&newMask);
+       ret = sigaddset (&newMask, SIGCHLD);
+       ret = sigprocmask(SIG_BLOCK, &newMask, &oldMask);
+    } else {
+    }
+
+    /*@-infloops@*/
+    while (ret == 0 && sq->reaped != sq->child) {
+       if (same_thread) {
+           ret = sigsuspend(&oldMask);
+       } else {
+           ret = pthread_mutex_lock(&sq->mutex);
+           ret = pthread_cond_wait(&sq->cond, &sq->mutex);
+           xx = pthread_mutex_unlock(&sq->mutex);
+       }
+    }
+    /*@=infloops@*/
+
+    if (same_thread) {
+       xx = sigprocmask(SIG_SETMASK, &oldMask, NULL);
+    } else {
+    }
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "      Wake(%p): %p child %d reaper %d ret %d\n", ME(), sq, sq->child, sq->reaper, ret);
+/*@=modfilesys@*/
+#endif
+
+    xx = rpmsqRemove(sq);
+    xx = rpmsqEnable(-SIGCHLD, NULL);
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "   Disable(%p): %p\n", ME(), sq);
+/*@=modfilesys@*/
+#endif
+
+    return ret;
+}
+
+pid_t rpmsqWait(rpmsq sq)
+{
+    int same_thread = SAME_THREAD(ME(), rpmsqQueue->id);
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "      Wait(%p): %p child %d reaper %d same %d\n", ME(), sq, sq->child, sq->reaper, same_thread);
+/*@=modfilesys@*/
+#endif
+
+    if (sq->reaper) {
+       (void) rpmsqWaitUnregister(sq);
+    } else {
+       pid_t reaped;
+       int status;
+       do {
+           reaped = waitpid(sq->child, &status, 0);
+       } while (reaped >= 0 && reaped != sq->child);
+       sq->reaped = reaped;
+       sq->status = status;
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "   Waitpid(%p): %p child %d reaped %d\n", ME(), sq, sq->child, sq->reaped);
+/*@=modfilesys@*/
+#endif
+    }
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, "      Fini(%p): %p child %d status 0x%x\n", ME(), sq, sq->child, sq->status);
+/*@=modfilesys@*/
+#endif
+
+    return sq->reaped;
+}
+
 /**
  * SIGCHLD cancellation handler.
  */
@@ -203,6 +460,7 @@ rpmsqExecve (const char ** argv)
     pid_t pid;
     pid_t result;
     sigset_t newMask, oldMask;
+    rpmsq sq = memset(alloca(sizeof(*sq)), 0, sizeof(*sq));
 
     DO_LOCK ();
     if (ADD_REF (rpmsigTbl_sigchld) == 0) {
@@ -256,7 +514,7 @@ rpmsqExecve (const char ** argv)
     DO_LOCK ();
     if ((SUB_REF (rpmsigTbl_sigchld) == 0 &&
         (rpmsqEnable(-SIGINT, NULL) < 0 || rpmsqEnable (-SIGQUIT, NULL) < 0))
-      || sigprocmask (SIG_SETMASK, &oldMask, (sigset_t *) NULL) != 0)
+      || sigprocmask (SIG_SETMASK, &oldMask, NULL) != 0)
     {
        status = -1;
     }
index 4fa96a7..9e3960e 100644 (file)
@@ -6,14 +6,20 @@
  *
  */
 
+#include <pthread.h>
 #include <signal.h>
 #include <sys/signal.h>
-#include <search.h>
+#include <search.h>            /* XXX insque(3)/remque(3) protos. */
 
 typedef struct rpmsig_s * rpmsig;
 
 typedef struct rpmsqElem * rpmsq;
 
+/*@-redecl@*/
+/*@unchecked@*/
+extern int _rpmsq_debug;
+/*@=redecl@*/
+
 /**
  * SIGCHLD queue element.
  */
@@ -21,8 +27,12 @@ struct rpmsqElem {
     struct rpmsqElem * q_forw; /*!< for use by insque(3)/remque(3). */
     struct rpmsqElem * q_back;
     pid_t child;               /*!< Currently running child. */
-    pid_t reaped;              /*!< Reaped waitpid(3) return. */
-    int status;                        /*!< Reaped waitpid(3) status. */
+    volatile pid_t reaped;     /*!< Reaped waitpid(3) return. */
+    volatile int status;       /*!< Reaped waitpid(3) status. */
+    int reaper;                        /*!< Register SIGCHLD handler? */
+    void * id;                 /*!< Blocking thread id (pthread_t). */
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
 };
 
 /*@unchecked@*/
@@ -37,13 +47,13 @@ extern sigset_t rpmsqCaught;
 
 /**
  */
-void Insque(/*@null@*/ void * elem, /*@null@*/ void * prev)
+int rpmsqInsert(/*@null@*/ void * elem, /*@null@*/ void * prev)
        /*@globals rpmsqQueue @*/
        /*@modifies elem, rpmsqQueue @*/;
 
 /**
  */
-void Remque(/*@null@*/ void * elem)
+int rpmsqRemove(/*@null@*/ void * elem)
        /*@modifies elem @*/;
 
 /**
@@ -63,10 +73,27 @@ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler)
        /*@modifies rpmsqCaught, fileSystem, internalState @*/;
 
 /**
+ * Fork a child process.
+ * @param sq           scriptlet queue element
+ * @return             fork(2) pid
+ */
+pid_t rpmsqFork(rpmsq sq)
+       /*@globals fileSystem, internalState @*/
+       /*@modifies sq, fileSystem, internalState @*/;
+
+/**
+ * Wait for child process to be reaped.
+ * @param sq           scriptlet queue element
+ * @return             reaped child pid
+ */
+pid_t rpmsqWait(rpmsq sq)
+       /*@globals fileSystem, internalState @*/
+       /*@modifies sq, fileSystem, internalState @*/;
+
+/**
  * Execute a command, returning its status.
  */
-int
-rpmsqExecve (const char ** argv)
+int rpmsqExecve (const char ** argv)
        /*@*/;
 
 #ifdef __cplusplus