diff options
Diffstat (limited to 'db/examples_c/ex_thread.c')
-rw-r--r-- | db/examples_c/ex_thread.c | 604 |
1 files changed, 604 insertions, 0 deletions
diff --git a/db/examples_c/ex_thread.c b/db/examples_c/ex_thread.c new file mode 100644 index 000000000..93812ade7 --- /dev/null +++ b/db/examples_c/ex_thread.c @@ -0,0 +1,604 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1997, 1998, 1999, 2000 + * Sleepycat Software. All rights reserved. + * + * $Id: ex_thread.c,v 11.9 2000/05/31 15:10:04 bostic Exp $ + */ + +#include "db_config.h" + +#ifndef NO_SYSTEM_INCLUDES +#include <sys/types.h> + +#if TIME_WITH_SYS_TIME +#include <sys/time.h> +#include <time.h> +#else +#if HAVE_SYS_TIME_H +#include <sys/time.h> +#else +#include <time.h> +#endif +#endif + +#include <errno.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#endif + +#include <db.h> + +/* + * NB: This application is written using POSIX 1003.1b-1993 pthreads + * interfaces, which may not be portable to your system. + */ +extern int sched_yield __P((void)); /* Pthread yield function. */ + +DB_ENV *db_init __P((char *)); +void *deadlock __P((void *)); +void fatal __P((char *, int, int)); +int main __P((int, char *[])); +int reader __P((int)); +void stats __P((void)); +void *trickle __P((void *)); +void *tstart __P((void *)); +void usage __P((void)); +void word __P((void)); +int writer __P((int)); + +struct _statistics { + int aborted; /* Write. */ + int aborts; /* Read/write. */ + int adds; /* Write. */ + int deletes; /* Write. */ + int txns; /* Write. */ + int found; /* Read. */ + int notfound; /* Read. */ +} *perf; + +const char + *progname = "ex_thread"; /* Program name. */ + +#define DATABASE "access.db" /* Database name. */ +#define WORDLIST "../test/wordlist" /* Dictionary. */ + +/* + * We can seriously increase the number of collisions and transaction + * aborts by yielding the scheduler after every DB call. Specify the + * -p option to do this. + */ +int punish; /* -p */ +int nlist; /* -n */ +int nreaders; /* -r */ +int verbose; /* -v */ +int nwriters; /* -w */ + +DB *dbp; /* Database handle. */ +DB_ENV *dbenv; /* Database environment. */ +int nthreads; /* Total threads. */ +char **list; /* Word list. */ + +/* + * ex_thread -- + * Run a simple threaded application of some numbers of readers and + * writers competing for a set of words. + * + * Example UNIX shell script to run this program: + * % rm -rf TESTDIR + * % mkdir TESTDIR + * % ex_thread -h TESTDIR + */ +int +main(argc, argv) + int argc; + char *argv[]; +{ + extern char *optarg; + extern int errno, optind; + pthread_t *tids; + int ch, i, ret; + char *home; + void *retp; + + nlist = 1000; + nreaders = nwriters = 4; + home = "TESTDIR"; + while ((ch = getopt(argc, argv, "h:pn:r:vw:")) != EOF) + switch (ch) { + case 'h': + home = optarg; + break; + case 'p': + punish = 1; + break; + case 'n': + nlist = atoi(optarg); + break; + case 'r': + nreaders = atoi(optarg); + break; + case 'v': + verbose = 1; + break; + case 'w': + nwriters = atoi(optarg); + break; + case '?': + default: + usage(); + } + argc -= optind; + argv += optind; + + /* Initialize the random number generator. */ + srand(getpid() | time(NULL)); + + /* Build the key list. */ + word(); + + /* Remove the previous database. */ + (void)unlink(DATABASE); + + /* Initialize the database environment. */ + dbenv = db_init(home); + + /* Initialize the database. */ + if ((ret = db_create(&dbp, dbenv, 0)) != 0) { + dbenv->err(dbenv, ret, "db_create"); + (void)dbenv->close(dbenv, 0); + return (1); + } + if ((ret = dbp->set_pagesize(dbp, 1024)) != 0) { + dbp->err(dbp, ret, "set_pagesize"); + goto err; + } + if ((ret = dbp->open(dbp, + DATABASE, NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0664)) != 0) { + dbp->err(dbp, ret, "%s: open", DATABASE); + goto err; + } + + nthreads = nreaders + nwriters + 2; + printf("Running: readers %d, writers %d\n", nreaders, nwriters); + fflush(stdout); + + /* Create statistics structures, offset by 1. */ + if ((perf = calloc(nreaders + nwriters + 1, sizeof(*perf))) == NULL) + fatal(NULL, errno, 1); + + /* Create thread ID structures. */ + if ((tids = malloc(nthreads * sizeof(pthread_t))) == NULL) + fatal(NULL, errno, 1); + + /* Create reader/writer threads. */ + for (i = 0; i < nreaders + nwriters; ++i) + if (pthread_create(&tids[i], NULL, tstart, (void *)i)) + fatal("pthread_create", errno, 1); + + /* Create buffer pool trickle thread. */ + if (pthread_create(&tids[i], NULL, trickle, &i)) + fatal("pthread_create", errno, 1); + ++i; + + /* Create deadlock detector thread. */ + if (pthread_create(&tids[i], NULL, deadlock, &i)) + fatal("pthread_create", errno, 1); + + /* Wait for the threads. */ + for (i = 0; i < nthreads; ++i) + (void)pthread_join(tids[i], &retp); + +err: (void)dbp->close(dbp, 0); + (void)dbenv->close(dbenv, 0); + + return (0); +} + +int +reader(id) + int id; +{ + DBT key, data; + int n, ret; + char buf[64]; + + /* + * DBT's must use local memory or malloc'd memory if the DB handle + * is accessed in a threaded fashion. + */ + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + data.flags = DB_DBT_MALLOC; + + /* + * Read-only threads do not require transaction protection, unless + * there's a need for repeatable reads. + */ + for (;;) { + /* Pick a key at random, and look it up. */ + n = rand() % nlist; + key.data = list[n]; + key.size = strlen(key.data); + + if (verbose) { + sprintf(buf, "reader: %d: list entry %d\n", id, n); + write(STDOUT_FILENO, buf, strlen(buf)); + } + + switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) { + case DB_LOCK_DEADLOCK: /* Deadlock. */ + ++perf[id].aborts; + break; + case 0: /* Success. */ + ++perf[id].found; + free(data.data); + break; + case DB_NOTFOUND: /* Not found. */ + ++perf[id].notfound; + break; + default: + sprintf(buf, + "reader %d: dbp->get: %s", id, (char *)key.data); + fatal(buf, ret, 0); + } + } + return (0); +} + +int +writer(id) + int id; +{ + DBT key, data; + DB_TXN *tid; + time_t now, then; + int n, ret; + char buf[256], dbuf[10000]; + + time(&now); + then = now; + + /* + * DBT's must use local memory or malloc'd memory if the DB handle + * is accessed in a threaded fashion. + */ + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + data.data = dbuf; + data.ulen = sizeof(dbuf); + data.flags = DB_DBT_USERMEM; + + for (;;) { + /* Pick a random key. */ + n = rand() % nlist; + key.data = list[n]; + key.size = strlen(key.data); + + if (verbose) { + sprintf(buf, "writer: %d: list entry %d\n", id, n); + write(STDOUT_FILENO, buf, strlen(buf)); + } + + /* Abort and retry. */ + if (0) { +retry: if ((ret = txn_abort(tid)) != 0) + fatal("txn_abort", ret, 1); + ++perf[id].aborts; + ++perf[id].aborted; + } + + /* Thread #1 prints out the stats every 20 seconds. */ + if (id == 1) { + time(&now); + if (now - then >= 20) { + stats(); + then = now; + } + } + + /* Begin the transaction. */ + if ((ret = txn_begin(dbenv, NULL, &tid, 0)) != 0) + fatal("txn_begin", ret, 1); + + /* + * Get the key. If it doesn't exist, add it. If it does + * exist, delete it. + */ + switch (ret = dbp->get(dbp, tid, &key, &data, 0)) { + case DB_LOCK_DEADLOCK: + goto retry; + case 0: + goto delete; + case DB_NOTFOUND: + goto add; + } + + sprintf(buf, "writer: %d: dbp->get", id); + fatal(buf, ret, 1); + /* NOTREACHED */ + +delete: /* Delete the key. */ + switch (ret = dbp->del(dbp, tid, &key, 0)) { + case DB_LOCK_DEADLOCK: + goto retry; + case 0: + ++perf[id].deletes; + goto commit; + } + + sprintf(buf, "writer: %d: dbp->del", id); + fatal(buf, ret, 1); + /* NOTREACHED */ + +add: /* Add the key. 1 data item in 30 is an overflow item. */ + data.size = 20 + rand() % 128; + if (rand() % 30 == 0) + data.size += 8192; + + switch (ret = dbp->put(dbp, tid, &key, &data, 0)) { + case DB_LOCK_DEADLOCK: + goto retry; + case 0: + ++perf[id].adds; + goto commit; + default: + sprintf(buf, "writer: %d: dbp->put", id); + fatal(buf, ret, 1); + } + +commit: /* The transaction finished, commit it. */ + if ((ret = txn_commit(tid, 0)) != 0) + fatal("txn_commit", ret, 1); + + /* + * Every time the thread completes 20 transactions, show + * our progress. + */ + if (++perf[id].txns % 20 == 0) { + sprintf(buf, +"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n", + id, perf[id].adds, perf[id].deletes, + perf[id].aborts, perf[id].txns); + write(STDOUT_FILENO, buf, strlen(buf)); + } + + /* + * If this thread was aborted more than 5 times before + * the transaction finished, complain. + */ + if (perf[id].aborted > 5) { + sprintf(buf, +"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d: ABORTED: %2d\n", + id, perf[id].adds, perf[id].deletes, + perf[id].aborts, perf[id].txns, perf[id].aborted); + write(STDOUT_FILENO, buf, strlen(buf)); + } + perf[id].aborted = 0; + } + return (0); +} + +/* + * stats -- + * Display reader/writer thread statistics. To display the statistics + * for the mpool trickle or deadlock threads, use db_stat(1). + */ +void +stats() +{ + int id; + char *p, buf[8192]; + + p = buf + sprintf(buf, "-------------\n"); + for (id = 0; id < nreaders + nwriters;) + if (id++ < nwriters) + p += sprintf(p, + "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n", + id, perf[id].adds, + perf[id].deletes, perf[id].aborts, perf[id].txns); + else + p += sprintf(p, + "reader: %2d: found: %5d: notfound: %5d: aborts: %4d\n", + id, perf[id].found, + perf[id].notfound, perf[id].aborts); + p += sprintf(p, "-------------\n"); + + write(STDOUT_FILENO, buf, p - buf); +} + +/* + * db_init -- + * Initialize the environment. + */ +DB_ENV * +db_init(home) + char *home; +{ + DB_ENV *dbenv; + int ret; + + if (punish) { + (void)db_env_set_pageyield(1); + (void)db_env_set_func_yield(sched_yield); + } + + if ((ret = db_env_create(&dbenv, 0)) != 0) { + fprintf(stderr, + "%s: db_env_create: %s\n", progname, db_strerror(ret)); + exit (1); + } + dbenv->set_errfile(dbenv, stderr); + dbenv->set_errpfx(dbenv, progname); + (void)dbenv->set_cachesize(dbenv, 0, 100 * 1024, 0); + (void)dbenv->set_lg_max(dbenv, 200000); + + if ((ret = dbenv->open(dbenv, home, + DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | + DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0)) != 0) { + dbenv->err(dbenv, ret, NULL); + (void)dbenv->close(dbenv, 0); + exit (1); + } + return (dbenv); +} + +/* + * tstart -- + * Thread start function for readers and writers. + */ +void * +tstart(arg) + void *arg; +{ + pthread_t tid; + u_int id; + + id = (u_int)arg + 1; + + tid = pthread_self(); + + if (id <= (u_int)nwriters) { + printf("write thread %d starting: tid: %lu\n", id, (u_long)tid); + fflush(stdout); + writer(id); + } else { + printf("read thread %d starting: tid: %lu\n", id, (u_long)tid); + fflush(stdout); + reader(id); + } + + /* NOTREACHED */ + return (NULL); +} + +/* + * deadlock -- + * Thread start function for lock_detect(). + */ +void * +deadlock(arg) + void *arg; +{ + struct timeval t; + pthread_t tid; + + arg = arg; /* XXX: shut the compiler up. */ + tid = pthread_self(); + + printf("deadlock thread starting: tid: %lu\n", (u_long)tid); + fflush(stdout); + + t.tv_sec = 0; + t.tv_usec = 100000; + for (;;) { + (void)lock_detect(dbenv, + DB_LOCK_CONFLICT, DB_LOCK_YOUNGEST, NULL); + + /* Check every 100ms. */ + (void)select(0, NULL, NULL, NULL, &t); + } + + /* NOTREACHED */ + return (NULL); +} + +/* + * trickle -- + * Thread start function for memp_trickle(). + */ +void * +trickle(arg) + void *arg; +{ + pthread_t tid; + int wrote; + char buf[64]; + + arg = arg; /* XXX: shut the compiler up. */ + tid = pthread_self(); + + printf("trickle thread starting: tid: %lu\n", (u_long)tid); + fflush(stdout); + + for (;;) { + (void)memp_trickle(dbenv, 10, &wrote); + if (verbose) { + sprintf(buf, "trickle: wrote %d\n", wrote); + write(STDOUT_FILENO, buf, strlen(buf)); + } + if (wrote == 0) { + sleep(1); + sched_yield(); + } + } + + /* NOTREACHED */ + return (NULL); +} + +/* + * word -- + * Build the dictionary word list. + */ +void +word() +{ + FILE *fp; + int cnt; + char buf[256]; + + if ((fp = fopen(WORDLIST, "r")) == NULL) + fatal(WORDLIST, errno, 1); + + if ((list = malloc(nlist * sizeof(char *))) == NULL) + fatal(NULL, errno, 1); + + for (cnt = 0; cnt < nlist; ++cnt) { + if (fgets(buf, sizeof(buf), fp) == NULL) + break; + if ((list[cnt] = strdup(buf)) == NULL) + fatal(NULL, errno, 1); + } + nlist = cnt; /* In case nlist was larger than possible. */ +} + +/* + * fatal -- + * Report a fatal error and quit. + */ +void +fatal(msg, err, syserr) + char *msg; + int err, syserr; +{ + fprintf(stderr, "%s: ", progname); + if (msg != NULL) { + fprintf(stderr, "%s", msg); + if (syserr) + fprintf(stderr, ": "); + } + if (syserr) + fprintf(stderr, "%s", strerror(err)); + fprintf(stderr, "\n"); + exit (1); + + /* NOTREACHED */ +} + +/* + * usage -- + * Usage message. + */ +void +usage() +{ + (void)fprintf(stderr, + "usage: %s [-pv] [-h home] [-n words] [-r readers] [-w writers]\n", + progname); + exit(1); +} |