summaryrefslogtreecommitdiff
path: root/rpmio
diff options
context:
space:
mode:
authorjbj <devnull@localhost>2003-03-18 02:41:33 +0000
committerjbj <devnull@localhost>2003-03-18 02:41:33 +0000
commit5fe1914a155697b4f14483a394de9b25a4af5c19 (patch)
tree9baaa0532bad05157eff6ee73a159406277f4d56 /rpmio
parent1923ecbc300d330eae0cf898f9274d11d11df658 (diff)
downloadrpm-5fe1914a155697b4f14483a394de9b25a4af5c19.tar.gz
rpm-5fe1914a155697b4f14483a394de9b25a4af5c19.tar.bz2
rpm-5fe1914a155697b4f14483a394de9b25a4af5c19.zip
Proof of concept scriptlet queue, single or multi threaded.
CVS patchset: 6699 CVS date: 2003/03/18 02:41:33
Diffstat (limited to 'rpmio')
-rw-r--r--rpmio/rpmsq.c336
-rw-r--r--rpmio/rpmsq.h41
2 files changed, 331 insertions, 46 deletions
diff --git a/rpmio/rpmsq.c b/rpmio/rpmsq.c
index 52180b64c..15f48fec8 100644
--- a/rpmio/rpmsq.c
+++ b/rpmio/rpmsq.c
@@ -5,28 +5,121 @@
#include "system.h"
#if defined(HAVE_PTHREAD_H) && !defined(__LCLINT__)
+
#include <pthread.h>
-#endif
+
+#define DO_LOCK() pthread_mutex_lock(&rpmsigTbl_lock);
+#define DO_UNLOCK() pthread_mutex_unlock(&rpmsigTbl_lock);
+#define INIT_LOCK() \
+ { pthread_mutexattr_t attr; \
+ pthread_mutexattr_init(&attr); \
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); \
+ pthread_mutex_init (&rpmsigTbl_lock, &attr); \
+ pthread_mutexattr_destroy(&attr); \
+ rpmsigTbl_sigchld->active = 0; \
+ }
+#define ADD_REF(__tbl) (__tbl)->active++
+#define SUB_REF(__tbl) --(__tbl)->active
+#define CLEANUP_HANDLER(__handler, __arg, __oldtypeptr) \
+ pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, (__oldtypeptr)); \
+ pthread_cleanup_push((__handler), (__arg));
+#define CLEANUP_RESET(__execute, __oldtype) \
+ pthread_cleanup_pop(__execute); \
+ pthread_setcanceltype ((__oldtype), &(__oldtype));
+
+#define SAME_THREAD(_a, _b) pthread_equal(((pthread_t)_a), ((pthread_t)_b))
+
+#define ME() ((void *)pthread_self())
+
+#else
+
+#define DO_LOCK()
+#define DO_UNLOCK()
+#define INIT_LOCK()
+#define ADD_REF(__tbl)
+#define SUB_REF(__tbl)
+#define CLEANUP_HANDLER(__handler, __arg, __oldtypeptr)
+#define CLEANUP_RESET(__execute, __oldtype)
+
+#define SAME_THREAD(_a, _b) (42)
+
+#define ME() (((void *))getpid())
+
+#endif /* HAVE_PTHREAD_H */
#include <rpmsq.h>
#include "debug.h"
+#define _RPMSQ_DEBUG 0
+/*@unchecked@*/
+int _rpmsq_debug = _RPMSQ_DEBUG;
+
/*@unchecked@*/
static struct rpmsqElem rpmsqRock;
/*@unchecked@*/
rpmsq rpmsqQueue = &rpmsqRock;
-void Insque(void * elem, void * prev)
+int rpmsqInsert(void * elem, void * prev)
{
- if (elem != NULL)
- insque(elem, (prev ? prev : rpmsqQueue));
+ sigset_t newMask, oldMask;
+ rpmsq sq = (rpmsq) elem;
+ int ret = -1;
+
+ if (sq != NULL) {
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Insert(%p): %p\n", ME(), sq);
+/*@=modfilesys@*/
+#endif
+ ret = sigemptyset (&newMask);
+ ret = sigaddset (&newMask, SIGCHLD);
+ ret = sigprocmask(SIG_BLOCK, &newMask, &oldMask);
+ if (ret == 0) {
+ sq->child = 0;
+ sq->reaped = 0;
+ sq->status = 0;
+
+ sq->id = ME();
+ (void) pthread_mutex_init(&sq->mutex, NULL);
+ (void) pthread_cond_init(&sq->cond, NULL);
+ insque(elem, (prev ? prev : rpmsqQueue));
+ ret = sigprocmask(SIG_SETMASK, &oldMask, NULL);
+ }
+ }
+ return 0;
}
-void Remque(void * elem)
+int rpmsqRemove(void * elem)
{
- if (elem != NULL)
- remque(elem);
+ sigset_t newMask, oldMask;
+ rpmsq sq = (rpmsq) elem;
+ int ret = -1;
+
+ if (elem != NULL) {
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Remove(%p): %p\n", ME(), sq);
+/*@=modfilesys@*/
+#endif
+ ret = sigemptyset (&newMask);
+ ret = sigaddset (&newMask, SIGCHLD);
+ ret = sigprocmask(SIG_BLOCK, &newMask, &oldMask);
+ if (ret == 0) {
+ remque(elem);
+ (void) pthread_cond_destroy(&sq->cond);
+ (void) pthread_mutex_destroy(&sq->mutex);
+ sq->id = NULL;
+ sq->child = 0;
+ sq->reaped = 0;
+ sq->status = 0;
+ ret = sigprocmask(SIG_SETMASK, &oldMask, NULL);
+ }
+ }
+ return ret;
}
/*@unchecked@*/
@@ -49,27 +142,6 @@ static struct rpmsig_s {
#define rpmsigTbl_sigquit (&rpmsigTbl[1])
{ SIGCHLD, rpmsqHandler },
#define rpmsigTbl_sigchld (&rpmsigTbl[2])
-
-#define DO_LOCK() pthread_mutex_lock(&rpmsigTbl_lock);
-#define DO_UNLOCK() pthread_mutex_unlock(&rpmsigTbl_lock);
-#define INIT_LOCK() \
- { pthread_mutexattr_t attr; \
- pthread_mutexattr_init(&attr); \
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); \
- pthread_mutex_init (&rpmsigTbl_lock, &attr); \
- pthread_mutexattr_destroy(&attr); \
- rpmsigTbl_sigchld->active = 0; \
- }
-#define ADD_REF(__tbl) (__tbl)->active++
-#define SUB_REF(__tbl) --(__tbl)->active
-
-#define CLEANUP_HANDLER(__handler, __arg, __oldtypeptr) \
- pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, (__oldtypeptr)); \
- pthread_cleanup_push((__handler), (__arg));
-#define CLEANUP_RESET(__execute, __oldtype) \
- pthread_cleanup_pop(__execute); \
- pthread_setcanceltype ((__oldtype), &(__oldtype));
-
{ SIGHUP, rpmsqHandler },
#define rpmsigTbl_sighup (&rpmsigTbl[3])
{ SIGTERM, rpmsqHandler },
@@ -80,11 +152,10 @@ static struct rpmsig_s {
};
/*@=fullinitblock@*/
-/**
- */
/*@-incondefs@*/
void rpmsqHandler(int signum)
{
+ int save = errno;
rpmsig tbl;
for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) {
@@ -100,17 +171,42 @@ void rpmsqHandler(int signum)
int status = 0;
pid_t reaped = waitpid(0, &status, WNOHANG);
+ /* XXX errno set to ECHILD/EINVAL/EINTR. */
if (reaped <= 0)
/*@innerbreak@*/ break;
+ /* XXX insque(3)/remque(3) are dequeue, not ring. */
for (sq = rpmsqQueue->q_forw;
sq != NULL && sq != rpmsqQueue;
sq = sq->q_forw)
{
+ int same_thread;
if (sq->child != reaped)
/*@innercontinue@*/ continue;
+ same_thread = SAME_THREAD(ME(), rpmsqQueue->id);
+#ifdef _RPMSQ_DEBUG_XXX
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Reap(%p): %p child %d id %p same %d\n", ME(), sq, sq->child, sq->id, same_thread);
+/*@=modfilesys@*/
+#endif
sq->reaped = reaped;
sq->status = status;
+
+#ifdef HACK
+ if (!SAME_THREAD(ME(), sq->id))
+#endif
+ {
+
+#ifdef _RPMSQ_DEBUG_XXX
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Signal(%p): %p child %d id %p\n", ME(), sq, sq->child, sq->id);
+/*@=modfilesys@*/
+#endif
+ (void) pthread_cond_signal(&sq->cond);
+ }
+
/*@innerbreak@*/ break;
}
}
@@ -120,18 +216,11 @@ void rpmsqHandler(int signum)
}
break;
}
+ errno = save;
}
/*@=incondefs@*/
-/**
- * Enable or disable a signal handler.
- * @param signum signal to enable (or disable if negative)
- * @param handler signal handler (or NULL to use rpmsqHandler())
- * @return no. of refs, -1 on error
- */
int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler)
- /*@globals rpmsqCaught, rpmsigTbl @*/
- /*@modifies rpmsqCaught, rpmsigTbl @*/
{
int tblsignum = (signum >= 0 ? signum : -signum);
struct sigaction sa;
@@ -139,6 +228,8 @@ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler)
int ret = -1;
DO_LOCK ();
+ if (rpmsqQueue->id == NULL)
+ rpmsqQueue->id = ME();
for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) {
if (tblsignum != tbl->signum)
continue;
@@ -169,6 +260,172 @@ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler)
return ret;
}
+pid_t rpmsqFork(rpmsq sq)
+{
+ sigset_t newMask, oldMask;
+ pid_t pid;
+ int pipes[2];
+ int xx;
+
+ if (sq->reaper) {
+ xx = rpmsqInsert(sq, NULL);
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Enable(%p): %p\n", ME(), sq);
+/*@=modfilesys@*/
+#endif
+ xx = rpmsqEnable(SIGCHLD, NULL);
+ }
+
+ xx = pipe(pipes);
+
+ xx = sigemptyset (&newMask);
+ xx = sigaddset (&newMask, SIGCHLD);
+ xx = sigprocmask (SIG_BLOCK, &newMask, &oldMask);
+
+ pid = fork();
+ if (pid < (pid_t) 0) { /* fork failed. */
+ close(pipes[0]);
+ close(pipes[1]);
+ goto out;
+ } else if (pid == (pid_t) 0) { /* Child. */
+ int yy;
+
+ /* Block to permit parent to wait. */
+ close(pipes[1]);
+ xx = read(pipes[0], &yy, sizeof(yy));
+ close(pipes[0]);
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Child(%p): %p child %d\n", ME(), sq, getpid());
+/*@=modfilesys@*/
+#endif
+
+ } else { /* Parent. */
+
+ sq->child = pid;
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Parent(%p): %p child %d\n", ME(), sq, sq->child);
+/*@=modfilesys@*/
+#endif
+
+ /* Unblock child. */
+ close(pipes[0]);
+ close(pipes[1]);
+
+ }
+
+out:
+ xx = sigprocmask (SIG_SETMASK, &oldMask, NULL);
+ return sq->child;
+}
+
+/**
+ * Wait for child process to be reaped, and unregister SIGCHLD handler.
+ * @param sq scriptlet queue element
+ * @return 0 on success
+ */
+static int rpmsqWaitUnregister(rpmsq sq)
+ /*@globals fileSystem, internalState @*/
+ /*@modifies fileSystem, internalState @*/
+{
+ sigset_t newMask, oldMask;
+#ifdef HACK
+ int same_thread = SAME_THREAD(ME(), rpmsqQueue->id);
+#else
+ int same_thread = 0;
+#endif
+ int ret = 0;
+ int xx;
+
+ if (same_thread) {
+ ret = sigemptyset (&newMask);
+ ret = sigaddset (&newMask, SIGCHLD);
+ ret = sigprocmask(SIG_BLOCK, &newMask, &oldMask);
+ } else {
+ }
+
+ /*@-infloops@*/
+ while (ret == 0 && sq->reaped != sq->child) {
+ if (same_thread) {
+ ret = sigsuspend(&oldMask);
+ } else {
+ ret = pthread_mutex_lock(&sq->mutex);
+ ret = pthread_cond_wait(&sq->cond, &sq->mutex);
+ xx = pthread_mutex_unlock(&sq->mutex);
+ }
+ }
+ /*@=infloops@*/
+
+ if (same_thread) {
+ xx = sigprocmask(SIG_SETMASK, &oldMask, NULL);
+ } else {
+ }
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Wake(%p): %p child %d reaper %d ret %d\n", ME(), sq, sq->child, sq->reaper, ret);
+/*@=modfilesys@*/
+#endif
+
+ xx = rpmsqRemove(sq);
+ xx = rpmsqEnable(-SIGCHLD, NULL);
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Disable(%p): %p\n", ME(), sq);
+/*@=modfilesys@*/
+#endif
+
+ return ret;
+}
+
+pid_t rpmsqWait(rpmsq sq)
+{
+ int same_thread = SAME_THREAD(ME(), rpmsqQueue->id);
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Wait(%p): %p child %d reaper %d same %d\n", ME(), sq, sq->child, sq->reaper, same_thread);
+/*@=modfilesys@*/
+#endif
+
+ if (sq->reaper) {
+ (void) rpmsqWaitUnregister(sq);
+ } else {
+ pid_t reaped;
+ int status;
+ do {
+ reaped = waitpid(sq->child, &status, 0);
+ } while (reaped >= 0 && reaped != sq->child);
+ sq->reaped = reaped;
+ sq->status = status;
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Waitpid(%p): %p child %d reaped %d\n", ME(), sq, sq->child, sq->reaped);
+/*@=modfilesys@*/
+#endif
+ }
+
+#ifdef _RPMSQ_DEBUG
+/*@-modfilesys@*/
+if (_rpmsq_debug)
+fprintf(stderr, " Fini(%p): %p child %d status 0x%x\n", ME(), sq, sq->child, sq->status);
+/*@=modfilesys@*/
+#endif
+
+ return sq->reaped;
+}
+
/**
* SIGCHLD cancellation handler.
*/
@@ -203,6 +460,7 @@ rpmsqExecve (const char ** argv)
pid_t pid;
pid_t result;
sigset_t newMask, oldMask;
+ rpmsq sq = memset(alloca(sizeof(*sq)), 0, sizeof(*sq));
DO_LOCK ();
if (ADD_REF (rpmsigTbl_sigchld) == 0) {
@@ -256,7 +514,7 @@ rpmsqExecve (const char ** argv)
DO_LOCK ();
if ((SUB_REF (rpmsigTbl_sigchld) == 0 &&
(rpmsqEnable(-SIGINT, NULL) < 0 || rpmsqEnable (-SIGQUIT, NULL) < 0))
- || sigprocmask (SIG_SETMASK, &oldMask, (sigset_t *) NULL) != 0)
+ || sigprocmask (SIG_SETMASK, &oldMask, NULL) != 0)
{
status = -1;
}
diff --git a/rpmio/rpmsq.h b/rpmio/rpmsq.h
index 4fa96a72c..9e3960ebc 100644
--- a/rpmio/rpmsq.h
+++ b/rpmio/rpmsq.h
@@ -6,14 +6,20 @@
*
*/
+#include <pthread.h>
#include <signal.h>
#include <sys/signal.h>
-#include <search.h>
+#include <search.h> /* XXX insque(3)/remque(3) protos. */
typedef struct rpmsig_s * rpmsig;
typedef struct rpmsqElem * rpmsq;
+/*@-redecl@*/
+/*@unchecked@*/
+extern int _rpmsq_debug;
+/*@=redecl@*/
+
/**
* SIGCHLD queue element.
*/
@@ -21,8 +27,12 @@ struct rpmsqElem {
struct rpmsqElem * q_forw; /*!< for use by insque(3)/remque(3). */
struct rpmsqElem * q_back;
pid_t child; /*!< Currently running child. */
- pid_t reaped; /*!< Reaped waitpid(3) return. */
- int status; /*!< Reaped waitpid(3) status. */
+ volatile pid_t reaped; /*!< Reaped waitpid(3) return. */
+ volatile int status; /*!< Reaped waitpid(3) status. */
+ int reaper; /*!< Register SIGCHLD handler? */
+ void * id; /*!< Blocking thread id (pthread_t). */
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
};
/*@unchecked@*/
@@ -37,13 +47,13 @@ extern sigset_t rpmsqCaught;
/**
*/
-void Insque(/*@null@*/ void * elem, /*@null@*/ void * prev)
+int rpmsqInsert(/*@null@*/ void * elem, /*@null@*/ void * prev)
/*@globals rpmsqQueue @*/
/*@modifies elem, rpmsqQueue @*/;
/**
*/
-void Remque(/*@null@*/ void * elem)
+int rpmsqRemove(/*@null@*/ void * elem)
/*@modifies elem @*/;
/**
@@ -63,10 +73,27 @@ int rpmsqEnable(int signum, /*@null@*/ sighandler_t handler)
/*@modifies rpmsqCaught, fileSystem, internalState @*/;
/**
+ * Fork a child process.
+ * @param sq scriptlet queue element
+ * @return fork(2) pid
+ */
+pid_t rpmsqFork(rpmsq sq)
+ /*@globals fileSystem, internalState @*/
+ /*@modifies sq, fileSystem, internalState @*/;
+
+/**
+ * Wait for child process to be reaped.
+ * @param sq scriptlet queue element
+ * @return reaped child pid
+ */
+pid_t rpmsqWait(rpmsq sq)
+ /*@globals fileSystem, internalState @*/
+ /*@modifies sq, fileSystem, internalState @*/;
+
+/**
* Execute a command, returning its status.
*/
-int
-rpmsqExecve (const char ** argv)
+int rpmsqExecve (const char ** argv)
/*@*/;
#ifdef __cplusplus