diff options
author | jbj <devnull@localhost> | 2003-03-18 02:41:33 +0000 |
---|---|---|
committer | jbj <devnull@localhost> | 2003-03-18 02:41:33 +0000 |
commit | 5fe1914a155697b4f14483a394de9b25a4af5c19 (patch) | |
tree | 9baaa0532bad05157eff6ee73a159406277f4d56 /rpmio | |
parent | 1923ecbc300d330eae0cf898f9274d11d11df658 (diff) | |
download | rpm-5fe1914a155697b4f14483a394de9b25a4af5c19.tar.gz rpm-5fe1914a155697b4f14483a394de9b25a4af5c19.tar.bz2 rpm-5fe1914a155697b4f14483a394de9b25a4af5c19.zip |
Proof of concept scriptlet queue, single or multi threaded.
CVS patchset: 6699
CVS date: 2003/03/18 02:41:33
Diffstat (limited to 'rpmio')
-rw-r--r-- | rpmio/rpmsq.c | 336 | ||||
-rw-r--r-- | rpmio/rpmsq.h | 41 |
2 files changed, 331 insertions, 46 deletions
diff --git a/rpmio/rpmsq.c b/rpmio/rpmsq.c index 52180b64c..15f48fec8 100644 --- a/rpmio/rpmsq.c +++ b/rpmio/rpmsq.c @@ -5,28 +5,121 @@ #include "system.h" #if defined(HAVE_PTHREAD_H) && !defined(__LCLINT__) + #include <pthread.h> -#endif + +#define DO_LOCK() pthread_mutex_lock(&rpmsigTbl_lock); +#define DO_UNLOCK() pthread_mutex_unlock(&rpmsigTbl_lock); +#define INIT_LOCK() \ + { pthread_mutexattr_t attr; \ + pthread_mutexattr_init(&attr); \ + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); \ + pthread_mutex_init (&rpmsigTbl_lock, &attr); \ + pthread_mutexattr_destroy(&attr); \ + rpmsigTbl_sigchld->active = 0; \ + } +#define ADD_REF(__tbl) (__tbl)->active++ +#define SUB_REF(__tbl) --(__tbl)->active +#define CLEANUP_HANDLER(__handler, __arg, __oldtypeptr) \ + pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, (__oldtypeptr)); \ + pthread_cleanup_push((__handler), (__arg)); +#define CLEANUP_RESET(__execute, __oldtype) \ + pthread_cleanup_pop(__execute); \ + pthread_setcanceltype ((__oldtype), &(__oldtype)); + +#define SAME_THREAD(_a, _b) pthread_equal(((pthread_t)_a), ((pthread_t)_b)) + +#define ME() ((void *)pthread_self()) + +#else + +#define DO_LOCK() +#define DO_UNLOCK() +#define INIT_LOCK() +#define ADD_REF(__tbl) +#define SUB_REF(__tbl) +#define CLEANUP_HANDLER(__handler, __arg, __oldtypeptr) +#define CLEANUP_RESET(__execute, __oldtype) + +#define SAME_THREAD(_a, _b) (42) + +#define ME() (((void *))getpid()) + +#endif /* HAVE_PTHREAD_H */ #include <rpmsq.h> #include "debug.h" +#define _RPMSQ_DEBUG 0 +/*@unchecked@*/ +int _rpmsq_debug = _RPMSQ_DEBUG; + /*@unchecked@*/ static struct rpmsqElem rpmsqRock; /*@unchecked@*/ rpmsq rpmsqQueue = &rpmsqRock; -void Insque(void * elem, void * prev) +int rpmsqInsert(void * elem, void * prev) { - if (elem != NULL) - insque(elem, (prev ? prev : rpmsqQueue)); + sigset_t newMask, oldMask; + rpmsq sq = (rpmsq) elem; + int ret = -1; + + if (sq != NULL) { +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Insert(%p): %p\n", ME(), sq); +/*@=modfilesys@*/ +#endif + ret = sigemptyset (&newMask); + ret = sigaddset (&newMask, SIGCHLD); + ret = sigprocmask(SIG_BLOCK, &newMask, &oldMask); + if (ret == 0) { + sq->child = 0; + sq->reaped = 0; + sq->status = 0; + + sq->id = ME(); + (void) pthread_mutex_init(&sq->mutex, NULL); + (void) pthread_cond_init(&sq->cond, NULL); + insque(elem, (prev ? prev : rpmsqQueue)); + ret = sigprocmask(SIG_SETMASK, &oldMask, NULL); + } + } + return 0; } -void Remque(void * elem) +int rpmsqRemove(void * elem) { - if (elem != NULL) - remque(elem); + sigset_t newMask, oldMask; + rpmsq sq = (rpmsq) elem; + int ret = -1; + + if (elem != NULL) { + +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Remove(%p): %p\n", ME(), sq); +/*@=modfilesys@*/ +#endif + ret = sigemptyset (&newMask); + ret = sigaddset (&newMask, SIGCHLD); + ret = sigprocmask(SIG_BLOCK, &newMask, &oldMask); + if (ret == 0) { + remque(elem); + (void) pthread_cond_destroy(&sq->cond); + (void) pthread_mutex_destroy(&sq->mutex); + sq->id = NULL; + sq->child = 0; + sq->reaped = 0; + sq->status = 0; + ret = sigprocmask(SIG_SETMASK, &oldMask, NULL); + } + } + return ret; } /*@unchecked@*/ @@ -49,27 +142,6 @@ static struct rpmsig_s { #define rpmsigTbl_sigquit (&rpmsigTbl[1]) { SIGCHLD, rpmsqHandler }, #define rpmsigTbl_sigchld (&rpmsigTbl[2]) - -#define DO_LOCK() pthread_mutex_lock(&rpmsigTbl_lock); -#define DO_UNLOCK() pthread_mutex_unlock(&rpmsigTbl_lock); -#define INIT_LOCK() \ - { pthread_mutexattr_t attr; \ - pthread_mutexattr_init(&attr); \ - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); \ - pthread_mutex_init (&rpmsigTbl_lock, &attr); \ - pthread_mutexattr_destroy(&attr); \ - rpmsigTbl_sigchld->active = 0; \ - } -#define ADD_REF(__tbl) (__tbl)->active++ -#define SUB_REF(__tbl) --(__tbl)->active - -#define CLEANUP_HANDLER(__handler, __arg, __oldtypeptr) \ - pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, (__oldtypeptr)); \ - pthread_cleanup_push((__handler), (__arg)); -#define CLEANUP_RESET(__execute, __oldtype) \ - pthread_cleanup_pop(__execute); \ - pthread_setcanceltype ((__oldtype), &(__oldtype)); - { SIGHUP, rpmsqHandler }, #define rpmsigTbl_sighup (&rpmsigTbl[3]) { SIGTERM, rpmsqHandler }, @@ -80,11 +152,10 @@ static struct rpmsig_s { }; /*@=fullinitblock@*/ -/** - */ /*@-incondefs@*/ void rpmsqHandler(int signum) { + int save = errno; rpmsig tbl; for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) { @@ -100,17 +171,42 @@ void rpmsqHandler(int signum) int status = 0; pid_t reaped = waitpid(0, &status, WNOHANG); + /* XXX errno set to ECHILD/EINVAL/EINTR. */ if (reaped <= 0) /*@innerbreak@*/ break; + /* XXX insque(3)/remque(3) are dequeue, not ring. */ for (sq = rpmsqQueue->q_forw; sq != NULL && sq != rpmsqQueue; sq = sq->q_forw) { + int same_thread; if (sq->child != reaped) /*@innercontinue@*/ continue; + same_thread = SAME_THREAD(ME(), rpmsqQueue->id); +#ifdef _RPMSQ_DEBUG_XXX +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Reap(%p): %p child %d id %p same %d\n", ME(), sq, sq->child, sq->id, same_thread); +/*@=modfilesys@*/ +#endif sq->reaped = reaped; sq->status = status; + +#ifdef HACK + if (!SAME_THREAD(ME(), sq->id)) +#endif + { + +#ifdef _RPMSQ_DEBUG_XXX +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Signal(%p): %p child %d id %p\n", ME(), sq, sq->child, sq->id); +/*@=modfilesys@*/ +#endif + (void) pthread_cond_signal(&sq->cond); + } + /*@innerbreak@*/ break; } } @@ -120,18 +216,11 @@ void rpmsqHandler(int signum) } break; } + errno = save; } /*@=incondefs@*/ -/** - * Enable or disable a signal handler. - * @param signum signal to enable (or disable if negative) - * @param handler signal handler (or NULL to use rpmsqHandler()) - * @return no. of refs, -1 on error - */ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler) - /*@globals rpmsqCaught, rpmsigTbl @*/ - /*@modifies rpmsqCaught, rpmsigTbl @*/ { int tblsignum = (signum >= 0 ? signum : -signum); struct sigaction sa; @@ -139,6 +228,8 @@ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler) int ret = -1; DO_LOCK (); + if (rpmsqQueue->id == NULL) + rpmsqQueue->id = ME(); for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) { if (tblsignum != tbl->signum) continue; @@ -169,6 +260,172 @@ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler) return ret; } +pid_t rpmsqFork(rpmsq sq) +{ + sigset_t newMask, oldMask; + pid_t pid; + int pipes[2]; + int xx; + + if (sq->reaper) { + xx = rpmsqInsert(sq, NULL); +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Enable(%p): %p\n", ME(), sq); +/*@=modfilesys@*/ +#endif + xx = rpmsqEnable(SIGCHLD, NULL); + } + + xx = pipe(pipes); + + xx = sigemptyset (&newMask); + xx = sigaddset (&newMask, SIGCHLD); + xx = sigprocmask (SIG_BLOCK, &newMask, &oldMask); + + pid = fork(); + if (pid < (pid_t) 0) { /* fork failed. */ + close(pipes[0]); + close(pipes[1]); + goto out; + } else if (pid == (pid_t) 0) { /* Child. */ + int yy; + + /* Block to permit parent to wait. */ + close(pipes[1]); + xx = read(pipes[0], &yy, sizeof(yy)); + close(pipes[0]); + +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Child(%p): %p child %d\n", ME(), sq, getpid()); +/*@=modfilesys@*/ +#endif + + } else { /* Parent. */ + + sq->child = pid; + +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Parent(%p): %p child %d\n", ME(), sq, sq->child); +/*@=modfilesys@*/ +#endif + + /* Unblock child. */ + close(pipes[0]); + close(pipes[1]); + + } + +out: + xx = sigprocmask (SIG_SETMASK, &oldMask, NULL); + return sq->child; +} + +/** + * Wait for child process to be reaped, and unregister SIGCHLD handler. + * @param sq scriptlet queue element + * @return 0 on success + */ +static int rpmsqWaitUnregister(rpmsq sq) + /*@globals fileSystem, internalState @*/ + /*@modifies fileSystem, internalState @*/ +{ + sigset_t newMask, oldMask; +#ifdef HACK + int same_thread = SAME_THREAD(ME(), rpmsqQueue->id); +#else + int same_thread = 0; +#endif + int ret = 0; + int xx; + + if (same_thread) { + ret = sigemptyset (&newMask); + ret = sigaddset (&newMask, SIGCHLD); + ret = sigprocmask(SIG_BLOCK, &newMask, &oldMask); + } else { + } + + /*@-infloops@*/ + while (ret == 0 && sq->reaped != sq->child) { + if (same_thread) { + ret = sigsuspend(&oldMask); + } else { + ret = pthread_mutex_lock(&sq->mutex); + ret = pthread_cond_wait(&sq->cond, &sq->mutex); + xx = pthread_mutex_unlock(&sq->mutex); + } + } + /*@=infloops@*/ + + if (same_thread) { + xx = sigprocmask(SIG_SETMASK, &oldMask, NULL); + } else { + } + +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Wake(%p): %p child %d reaper %d ret %d\n", ME(), sq, sq->child, sq->reaper, ret); +/*@=modfilesys@*/ +#endif + + xx = rpmsqRemove(sq); + xx = rpmsqEnable(-SIGCHLD, NULL); +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Disable(%p): %p\n", ME(), sq); +/*@=modfilesys@*/ +#endif + + return ret; +} + +pid_t rpmsqWait(rpmsq sq) +{ + int same_thread = SAME_THREAD(ME(), rpmsqQueue->id); + +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Wait(%p): %p child %d reaper %d same %d\n", ME(), sq, sq->child, sq->reaper, same_thread); +/*@=modfilesys@*/ +#endif + + if (sq->reaper) { + (void) rpmsqWaitUnregister(sq); + } else { + pid_t reaped; + int status; + do { + reaped = waitpid(sq->child, &status, 0); + } while (reaped >= 0 && reaped != sq->child); + sq->reaped = reaped; + sq->status = status; +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Waitpid(%p): %p child %d reaped %d\n", ME(), sq, sq->child, sq->reaped); +/*@=modfilesys@*/ +#endif + } + +#ifdef _RPMSQ_DEBUG +/*@-modfilesys@*/ +if (_rpmsq_debug) +fprintf(stderr, " Fini(%p): %p child %d status 0x%x\n", ME(), sq, sq->child, sq->status); +/*@=modfilesys@*/ +#endif + + return sq->reaped; +} + /** * SIGCHLD cancellation handler. */ @@ -203,6 +460,7 @@ rpmsqExecve (const char ** argv) pid_t pid; pid_t result; sigset_t newMask, oldMask; + rpmsq sq = memset(alloca(sizeof(*sq)), 0, sizeof(*sq)); DO_LOCK (); if (ADD_REF (rpmsigTbl_sigchld) == 0) { @@ -256,7 +514,7 @@ rpmsqExecve (const char ** argv) DO_LOCK (); if ((SUB_REF (rpmsigTbl_sigchld) == 0 && (rpmsqEnable(-SIGINT, NULL) < 0 || rpmsqEnable (-SIGQUIT, NULL) < 0)) - || sigprocmask (SIG_SETMASK, &oldMask, (sigset_t *) NULL) != 0) + || sigprocmask (SIG_SETMASK, &oldMask, NULL) != 0) { status = -1; } diff --git a/rpmio/rpmsq.h b/rpmio/rpmsq.h index 4fa96a72c..9e3960ebc 100644 --- a/rpmio/rpmsq.h +++ b/rpmio/rpmsq.h @@ -6,14 +6,20 @@ * */ +#include <pthread.h> #include <signal.h> #include <sys/signal.h> -#include <search.h> +#include <search.h> /* XXX insque(3)/remque(3) protos. */ typedef struct rpmsig_s * rpmsig; typedef struct rpmsqElem * rpmsq; +/*@-redecl@*/ +/*@unchecked@*/ +extern int _rpmsq_debug; +/*@=redecl@*/ + /** * SIGCHLD queue element. */ @@ -21,8 +27,12 @@ struct rpmsqElem { struct rpmsqElem * q_forw; /*!< for use by insque(3)/remque(3). */ struct rpmsqElem * q_back; pid_t child; /*!< Currently running child. */ - pid_t reaped; /*!< Reaped waitpid(3) return. */ - int status; /*!< Reaped waitpid(3) status. */ + volatile pid_t reaped; /*!< Reaped waitpid(3) return. */ + volatile int status; /*!< Reaped waitpid(3) status. */ + int reaper; /*!< Register SIGCHLD handler? */ + void * id; /*!< Blocking thread id (pthread_t). */ + pthread_mutex_t mutex; + pthread_cond_t cond; }; /*@unchecked@*/ @@ -37,13 +47,13 @@ extern sigset_t rpmsqCaught; /** */ -void Insque(/*@null@*/ void * elem, /*@null@*/ void * prev) +int rpmsqInsert(/*@null@*/ void * elem, /*@null@*/ void * prev) /*@globals rpmsqQueue @*/ /*@modifies elem, rpmsqQueue @*/; /** */ -void Remque(/*@null@*/ void * elem) +int rpmsqRemove(/*@null@*/ void * elem) /*@modifies elem @*/; /** @@ -63,10 +73,27 @@ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler) /*@modifies rpmsqCaught, fileSystem, internalState @*/; /** + * Fork a child process. + * @param sq scriptlet queue element + * @return fork(2) pid + */ +pid_t rpmsqFork(rpmsq sq) + /*@globals fileSystem, internalState @*/ + /*@modifies sq, fileSystem, internalState @*/; + +/** + * Wait for child process to be reaped. + * @param sq scriptlet queue element + * @return reaped child pid + */ +pid_t rpmsqWait(rpmsq sq) + /*@globals fileSystem, internalState @*/ + /*@modifies sq, fileSystem, internalState @*/; + +/** * Execute a command, returning its status. */ -int -rpmsqExecve (const char ** argv) +int rpmsqExecve (const char ** argv) /*@*/; #ifdef __cplusplus |