summaryrefslogtreecommitdiff
path: root/rpmio/rpmsq.c
blob: 4e69ad04007d25077f2ba950ffb1ef6c32068ad7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
/** \ingroup rpmio
 * \file rpmio/rpmsq.c
 */

#include "system.h"

#include <signal.h>
#include <sys/signal.h>
#include <sys/wait.h>
#include <search.h>
#include <errno.h>
#include <stdio.h>

#include <pthread.h>

/* XXX suggested in bugzilla #159024 */
#if PTHREAD_MUTEX_DEFAULT != PTHREAD_MUTEX_NORMAL
  #error RPM expects PTHREAD_MUTEX_DEFAULT == PTHREAD_MUTEX_NORMAL
#endif

#ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
static pthread_mutex_t rpmsigTbl_lock = PTHREAD_MUTEX_INITIALIZER;
#else
static pthread_mutex_t rpmsigTbl_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
#endif

#define	DO_LOCK()	pthread_mutex_lock(&rpmsigTbl_lock);
#define	DO_UNLOCK()	pthread_mutex_unlock(&rpmsigTbl_lock);
#define	ADD_REF(__tbl)	(__tbl)->active++
#define	SUB_REF(__tbl)	--(__tbl)->active

#define	ME()	((void *)pthread_self())

#define _RPMSQ_INTERNAL
#include <rpm/rpmsq.h>

#include "debug.h"

static struct rpmsqElem rpmsqRock;

static rpmsq rpmsqQueue = &rpmsqRock;

/** \ingroup rpmsq
 * Insert node into from queue.
 * @param elem          node to link
 * @param prev          previous node from queue
 * @return              0 on success
 */
static int rpmsqInsert(void * elem, void * prev)
{
    rpmsq sq = (rpmsq) elem;
    int ret = -1;

    if (sq != NULL) {
	ret = sighold(SIGCHLD);
	if (ret == 0) {
	    sq->child = 0;
	    sq->reaped = 0;
	    sq->status = 0;
	    sq->reaper = 1;
	    sq->pipes[0] = sq->pipes[1] = -1;

	    sq->id = ME();
	    ret = pthread_mutex_init(&sq->mutex, NULL);
	    insque(elem, (prev != NULL ? prev : rpmsqQueue));
	    ret = sigrelse(SIGCHLD);
	}
    }
    return ret;
}

/** \ingroup rpmsq
 * Remove node from queue.
 * @param elem          node to link
 * @return              0 on success
 */
static int rpmsqRemove(void * elem)
{
    rpmsq sq = (rpmsq) elem;
    int ret = -1;

    if (elem != NULL) {
	ret = sighold (SIGCHLD);
	if (ret == 0) {
	    remque(elem);
	   
	    /* Unlock the mutex and then destroy it */ 
	    if((ret = pthread_mutex_unlock(&sq->mutex)) == 0)
		ret = pthread_mutex_destroy(&sq->mutex);

	    sq->id = NULL;
	    if (sq->pipes[1])	ret = close(sq->pipes[1]);
	    if (sq->pipes[0])	ret = close(sq->pipes[0]);
	    sq->pipes[0] = sq->pipes[1] = -1;
	    ret = sigrelse(SIGCHLD);
	}
    }
    return ret;
}

static sigset_t rpmsqCaught;

static struct rpmsig_s {
    int signum;
    rpmsqAction_t handler;
    int active;
    struct sigaction oact;
} rpmsigTbl[] = {
    { SIGINT,	rpmsqAction },
#define	rpmsigTbl_sigint	(&rpmsigTbl[0])
    { SIGQUIT,	rpmsqAction },
#define	rpmsigTbl_sigquit	(&rpmsigTbl[1])
    { SIGCHLD,	rpmsqAction },
#define	rpmsigTbl_sigchld	(&rpmsigTbl[2])
    { SIGHUP,	rpmsqAction },
#define	rpmsigTbl_sighup	(&rpmsigTbl[3])
    { SIGTERM,	rpmsqAction },
#define	rpmsigTbl_sigterm	(&rpmsigTbl[4])
    { SIGPIPE,	rpmsqAction },
#define	rpmsigTbl_sigpipe	(&rpmsigTbl[5])
    { -1,	NULL },
};

int rpmsqIsCaught(int signum)
{
    return sigismember(&rpmsqCaught, signum);
}

#ifdef SA_SIGINFO
void rpmsqAction(int signum, siginfo_t * info, void * context)
#else
void rpmsqAction(int signum)
#endif
{
    int save = errno;
    rpmsig tbl;

    for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) {
	if (tbl->signum != signum)
	    continue;

	(void) sigaddset(&rpmsqCaught, signum);

	switch (signum) {
	case SIGCHLD:
	    while (1) {
		rpmsq sq;
		int status = 0;
		pid_t reaped = waitpid(0, &status, WNOHANG);

		/* XXX errno set to ECHILD/EINVAL/EINTR. */
		if (reaped <= 0)
		    break;

		/* XXX insque(3)/remque(3) are dequeue, not ring. */
		for (sq = rpmsqQueue->q_forw;
		     sq != NULL && sq != rpmsqQueue;
		     sq = sq->q_forw)
		{
		    int ret;

		    if (sq->child != reaped)
			continue;
		    sq->reaped = reaped;
		    sq->status = status;

		    /* Unlock the mutex.  The waiter will then be able to 
		     * aquire the lock.  
		     *
		     * XXX: jbj, wtd, if this fails? 
		     */
		    ret = pthread_mutex_unlock(&sq->mutex); 

		    break;
		}
	    }
	    break;
	default:
	    break;
	}
	break;
    }
    errno = save;
}

int rpmsqEnable(int signum, rpmsqAction_t handler)
{
    int tblsignum = (signum >= 0 ? signum : -signum);
    struct sigaction sa;
    rpmsig tbl;
    int ret = -1;

    (void) DO_LOCK ();
    if (rpmsqQueue->id == NULL)
	rpmsqQueue->id = ME();
    for (tbl = rpmsigTbl; tbl->signum >= 0; tbl++) {
	if (tblsignum != tbl->signum)
	    continue;

	if (signum >= 0) {			/* Enable. */
	    if (ADD_REF(tbl) <= 0) {
		(void) sigdelset(&rpmsqCaught, tbl->signum);

		/* XXX Don't set a signal handler if already SIG_IGN */
		(void) sigaction(tbl->signum, NULL, &tbl->oact);
		if (tbl->oact.sa_handler == SIG_IGN)
		    continue;

		(void) sigemptyset (&sa.sa_mask);
#ifdef SA_SIGINFO
		sa.sa_flags = SA_SIGINFO;
#else
		sa.sa_flags = 0;
#endif
		sa.sa_sigaction = (handler != NULL ? handler : tbl->handler);
		if (sigaction(tbl->signum, &sa, &tbl->oact) < 0) {
		    SUB_REF(tbl);
		    break;
		}
		tbl->active = 1;		/* XXX just in case */
		if (handler != NULL)
		    tbl->handler = handler;
	    }
	} else {				/* Disable. */
	    if (SUB_REF(tbl) <= 0) {
		if (sigaction(tbl->signum, &tbl->oact, NULL) < 0)
		    break;
		tbl->active = 0;		/* XXX just in case */
		tbl->handler = (handler != NULL ? handler : rpmsqAction);
	    }
	}
	ret = tbl->active;
	break;
    }
    (void) DO_UNLOCK ();
    return ret;
}

pid_t rpmsqFork(rpmsq sq)
{
    pid_t pid;
    int xx;
    int nothreads = 0;   /* XXX: Shouldn't this be a global? */

    if (sq->reaper) {
	xx = rpmsqInsert(sq, NULL);
	xx = rpmsqEnable(SIGCHLD, NULL);
    }

    xx = pipe(sq->pipes);

    xx = sighold(SIGCHLD);

    /* 
     * Initialize the cond var mutex.   We have to aquire the lock we 
     * use for the condition before we fork.  Otherwise it is possible for
     * the child to exit, we get sigchild and the sig handler to send 
     * the condition signal before we are waiting on the condition.
     */
    if (!nothreads) {
	if(pthread_mutex_lock(&sq->mutex)) {
	    /* Yack we did not get the lock, lets just give up */
	    xx = close(sq->pipes[0]);
	    xx = close(sq->pipes[1]);
	    sq->pipes[0] = sq->pipes[1] = -1;
	    goto out;
	}
    }

    pid = fork();
    if (pid < (pid_t) 0) {		/* fork failed.  */
	sq->child = (pid_t)-1;
	xx = close(sq->pipes[0]);
	xx = close(sq->pipes[1]);
	sq->pipes[0] = sq->pipes[1] = -1;
	goto out;
    } else if (pid == (pid_t) 0) {	/* Child. */
	int yy;

	/* Block to permit parent time to wait. */
	xx = close(sq->pipes[1]);
	xx = read(sq->pipes[0], &yy, sizeof(yy));
	xx = close(sq->pipes[0]);
	sq->pipes[0] = sq->pipes[1] = -1;
    } else {				/* Parent. */
	sq->child = pid;
    }

out:
    xx = sigrelse(SIGCHLD);
    return sq->child;
}

/**
 * Wait for child process to be reaped, and unregister SIGCHLD handler.
 * @todo Rewrite to use waitpid on helper thread.
 * @param sq		scriptlet queue element
 * @return		0 on success
 */
static int rpmsqWaitUnregister(rpmsq sq)
{
    int nothreads = 0;
    int ret = 0;
    int xx;

    /* Protect sq->reaped from handler changes. */
    ret = sighold(SIGCHLD);

    /* Start the child, linux often runs child before parent. */
    if (sq->pipes[0] >= 0)
	xx = close(sq->pipes[0]);
    if (sq->pipes[1] >= 0)
	xx = close(sq->pipes[1]);
    sq->pipes[0] = sq->pipes[1] = -1;

    /* Put a stopwatch on the time spent waiting to measure performance gain. */
    (void) rpmswEnter(&sq->op, -1);

    /* Wait for handler to receive SIGCHLD. */
    while (ret == 0 && sq->reaped != sq->child) {
	if (nothreads)
	    /* Note that sigpause re-enables SIGCHLD. */
	    ret = sigpause(SIGCHLD);
	else {
	    xx = sigrelse(SIGCHLD);
	    
	    /* 
	     * We start before the fork with this mutex locked;
	     * The only one that unlocks this the signal handler.
	     * So if we get the lock the child has been reaped.
	     */
	    ret = pthread_mutex_lock(&sq->mutex);
	    xx = sighold(SIGCHLD);
	}
    }

    /* Accumulate stopwatch time spent waiting, potential performance gain. */
    sq->ms_scriptlets += rpmswExit(&sq->op, -1)/1000;

    xx = sigrelse(SIGCHLD);

    /* Remove processed SIGCHLD item from queue. */
    xx = rpmsqRemove(sq);

    /* Disable SIGCHLD handler on refcount == 0. */
    xx = rpmsqEnable(-SIGCHLD, NULL);

    return ret;
}

pid_t rpmsqWait(rpmsq sq)
{
    if (sq->reaper) {
	(void) rpmsqWaitUnregister(sq);
    } else {
	pid_t reaped;
	int status;
	do {
	    reaped = waitpid(sq->child, &status, 0);
	} while (reaped >= 0 && reaped != sq->child);
	sq->reaped = reaped;
	sq->status = status;
    }

    return sq->reaped;
}