summaryrefslogtreecommitdiff
path: root/db/examples_c/ex_thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'db/examples_c/ex_thread.c')
-rw-r--r--db/examples_c/ex_thread.c604
1 files changed, 604 insertions, 0 deletions
diff --git a/db/examples_c/ex_thread.c b/db/examples_c/ex_thread.c
new file mode 100644
index 000000000..93812ade7
--- /dev/null
+++ b/db/examples_c/ex_thread.c
@@ -0,0 +1,604 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 1997, 1998, 1999, 2000
+ * Sleepycat Software. All rights reserved.
+ *
+ * $Id: ex_thread.c,v 11.9 2000/05/31 15:10:04 bostic Exp $
+ */
+
+#include "db_config.h"
+
+#ifndef NO_SYSTEM_INCLUDES
+#include <sys/types.h>
+
+#if TIME_WITH_SYS_TIME
+#include <sys/time.h>
+#include <time.h>
+#else
+#if HAVE_SYS_TIME_H
+#include <sys/time.h>
+#else
+#include <time.h>
+#endif
+#endif
+
+#include <errno.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#endif
+
+#include <db.h>
+
+/*
+ * NB: This application is written using POSIX 1003.1b-1993 pthreads
+ * interfaces, which may not be portable to your system.
+ */
+extern int sched_yield __P((void)); /* Pthread yield function. */
+
+DB_ENV *db_init __P((char *));
+void *deadlock __P((void *));
+void fatal __P((char *, int, int));
+int main __P((int, char *[]));
+int reader __P((int));
+void stats __P((void));
+void *trickle __P((void *));
+void *tstart __P((void *));
+void usage __P((void));
+void word __P((void));
+int writer __P((int));
+
+struct _statistics {
+ int aborted; /* Write. */
+ int aborts; /* Read/write. */
+ int adds; /* Write. */
+ int deletes; /* Write. */
+ int txns; /* Write. */
+ int found; /* Read. */
+ int notfound; /* Read. */
+} *perf;
+
+const char
+ *progname = "ex_thread"; /* Program name. */
+
+#define DATABASE "access.db" /* Database name. */
+#define WORDLIST "../test/wordlist" /* Dictionary. */
+
+/*
+ * We can seriously increase the number of collisions and transaction
+ * aborts by yielding the scheduler after every DB call. Specify the
+ * -p option to do this.
+ */
+int punish; /* -p */
+int nlist; /* -n */
+int nreaders; /* -r */
+int verbose; /* -v */
+int nwriters; /* -w */
+
+DB *dbp; /* Database handle. */
+DB_ENV *dbenv; /* Database environment. */
+int nthreads; /* Total threads. */
+char **list; /* Word list. */
+
+/*
+ * ex_thread --
+ * Run a simple threaded application of some numbers of readers and
+ * writers competing for a set of words.
+ *
+ * Example UNIX shell script to run this program:
+ * % rm -rf TESTDIR
+ * % mkdir TESTDIR
+ * % ex_thread -h TESTDIR
+ */
+int
+main(argc, argv)
+ int argc;
+ char *argv[];
+{
+ extern char *optarg;
+ extern int errno, optind;
+ pthread_t *tids;
+ int ch, i, ret;
+ char *home;
+ void *retp;
+
+ nlist = 1000;
+ nreaders = nwriters = 4;
+ home = "TESTDIR";
+ while ((ch = getopt(argc, argv, "h:pn:r:vw:")) != EOF)
+ switch (ch) {
+ case 'h':
+ home = optarg;
+ break;
+ case 'p':
+ punish = 1;
+ break;
+ case 'n':
+ nlist = atoi(optarg);
+ break;
+ case 'r':
+ nreaders = atoi(optarg);
+ break;
+ case 'v':
+ verbose = 1;
+ break;
+ case 'w':
+ nwriters = atoi(optarg);
+ break;
+ case '?':
+ default:
+ usage();
+ }
+ argc -= optind;
+ argv += optind;
+
+ /* Initialize the random number generator. */
+ srand(getpid() | time(NULL));
+
+ /* Build the key list. */
+ word();
+
+ /* Remove the previous database. */
+ (void)unlink(DATABASE);
+
+ /* Initialize the database environment. */
+ dbenv = db_init(home);
+
+ /* Initialize the database. */
+ if ((ret = db_create(&dbp, dbenv, 0)) != 0) {
+ dbenv->err(dbenv, ret, "db_create");
+ (void)dbenv->close(dbenv, 0);
+ return (1);
+ }
+ if ((ret = dbp->set_pagesize(dbp, 1024)) != 0) {
+ dbp->err(dbp, ret, "set_pagesize");
+ goto err;
+ }
+ if ((ret = dbp->open(dbp,
+ DATABASE, NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0664)) != 0) {
+ dbp->err(dbp, ret, "%s: open", DATABASE);
+ goto err;
+ }
+
+ nthreads = nreaders + nwriters + 2;
+ printf("Running: readers %d, writers %d\n", nreaders, nwriters);
+ fflush(stdout);
+
+ /* Create statistics structures, offset by 1. */
+ if ((perf = calloc(nreaders + nwriters + 1, sizeof(*perf))) == NULL)
+ fatal(NULL, errno, 1);
+
+ /* Create thread ID structures. */
+ if ((tids = malloc(nthreads * sizeof(pthread_t))) == NULL)
+ fatal(NULL, errno, 1);
+
+ /* Create reader/writer threads. */
+ for (i = 0; i < nreaders + nwriters; ++i)
+ if (pthread_create(&tids[i], NULL, tstart, (void *)i))
+ fatal("pthread_create", errno, 1);
+
+ /* Create buffer pool trickle thread. */
+ if (pthread_create(&tids[i], NULL, trickle, &i))
+ fatal("pthread_create", errno, 1);
+ ++i;
+
+ /* Create deadlock detector thread. */
+ if (pthread_create(&tids[i], NULL, deadlock, &i))
+ fatal("pthread_create", errno, 1);
+
+ /* Wait for the threads. */
+ for (i = 0; i < nthreads; ++i)
+ (void)pthread_join(tids[i], &retp);
+
+err: (void)dbp->close(dbp, 0);
+ (void)dbenv->close(dbenv, 0);
+
+ return (0);
+}
+
+int
+reader(id)
+ int id;
+{
+ DBT key, data;
+ int n, ret;
+ char buf[64];
+
+ /*
+ * DBT's must use local memory or malloc'd memory if the DB handle
+ * is accessed in a threaded fashion.
+ */
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ data.flags = DB_DBT_MALLOC;
+
+ /*
+ * Read-only threads do not require transaction protection, unless
+ * there's a need for repeatable reads.
+ */
+ for (;;) {
+ /* Pick a key at random, and look it up. */
+ n = rand() % nlist;
+ key.data = list[n];
+ key.size = strlen(key.data);
+
+ if (verbose) {
+ sprintf(buf, "reader: %d: list entry %d\n", id, n);
+ write(STDOUT_FILENO, buf, strlen(buf));
+ }
+
+ switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
+ case DB_LOCK_DEADLOCK: /* Deadlock. */
+ ++perf[id].aborts;
+ break;
+ case 0: /* Success. */
+ ++perf[id].found;
+ free(data.data);
+ break;
+ case DB_NOTFOUND: /* Not found. */
+ ++perf[id].notfound;
+ break;
+ default:
+ sprintf(buf,
+ "reader %d: dbp->get: %s", id, (char *)key.data);
+ fatal(buf, ret, 0);
+ }
+ }
+ return (0);
+}
+
+int
+writer(id)
+ int id;
+{
+ DBT key, data;
+ DB_TXN *tid;
+ time_t now, then;
+ int n, ret;
+ char buf[256], dbuf[10000];
+
+ time(&now);
+ then = now;
+
+ /*
+ * DBT's must use local memory or malloc'd memory if the DB handle
+ * is accessed in a threaded fashion.
+ */
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ data.data = dbuf;
+ data.ulen = sizeof(dbuf);
+ data.flags = DB_DBT_USERMEM;
+
+ for (;;) {
+ /* Pick a random key. */
+ n = rand() % nlist;
+ key.data = list[n];
+ key.size = strlen(key.data);
+
+ if (verbose) {
+ sprintf(buf, "writer: %d: list entry %d\n", id, n);
+ write(STDOUT_FILENO, buf, strlen(buf));
+ }
+
+ /* Abort and retry. */
+ if (0) {
+retry: if ((ret = txn_abort(tid)) != 0)
+ fatal("txn_abort", ret, 1);
+ ++perf[id].aborts;
+ ++perf[id].aborted;
+ }
+
+ /* Thread #1 prints out the stats every 20 seconds. */
+ if (id == 1) {
+ time(&now);
+ if (now - then >= 20) {
+ stats();
+ then = now;
+ }
+ }
+
+ /* Begin the transaction. */
+ if ((ret = txn_begin(dbenv, NULL, &tid, 0)) != 0)
+ fatal("txn_begin", ret, 1);
+
+ /*
+ * Get the key. If it doesn't exist, add it. If it does
+ * exist, delete it.
+ */
+ switch (ret = dbp->get(dbp, tid, &key, &data, 0)) {
+ case DB_LOCK_DEADLOCK:
+ goto retry;
+ case 0:
+ goto delete;
+ case DB_NOTFOUND:
+ goto add;
+ }
+
+ sprintf(buf, "writer: %d: dbp->get", id);
+ fatal(buf, ret, 1);
+ /* NOTREACHED */
+
+delete: /* Delete the key. */
+ switch (ret = dbp->del(dbp, tid, &key, 0)) {
+ case DB_LOCK_DEADLOCK:
+ goto retry;
+ case 0:
+ ++perf[id].deletes;
+ goto commit;
+ }
+
+ sprintf(buf, "writer: %d: dbp->del", id);
+ fatal(buf, ret, 1);
+ /* NOTREACHED */
+
+add: /* Add the key. 1 data item in 30 is an overflow item. */
+ data.size = 20 + rand() % 128;
+ if (rand() % 30 == 0)
+ data.size += 8192;
+
+ switch (ret = dbp->put(dbp, tid, &key, &data, 0)) {
+ case DB_LOCK_DEADLOCK:
+ goto retry;
+ case 0:
+ ++perf[id].adds;
+ goto commit;
+ default:
+ sprintf(buf, "writer: %d: dbp->put", id);
+ fatal(buf, ret, 1);
+ }
+
+commit: /* The transaction finished, commit it. */
+ if ((ret = txn_commit(tid, 0)) != 0)
+ fatal("txn_commit", ret, 1);
+
+ /*
+ * Every time the thread completes 20 transactions, show
+ * our progress.
+ */
+ if (++perf[id].txns % 20 == 0) {
+ sprintf(buf,
+"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",
+ id, perf[id].adds, perf[id].deletes,
+ perf[id].aborts, perf[id].txns);
+ write(STDOUT_FILENO, buf, strlen(buf));
+ }
+
+ /*
+ * If this thread was aborted more than 5 times before
+ * the transaction finished, complain.
+ */
+ if (perf[id].aborted > 5) {
+ sprintf(buf,
+"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d: ABORTED: %2d\n",
+ id, perf[id].adds, perf[id].deletes,
+ perf[id].aborts, perf[id].txns, perf[id].aborted);
+ write(STDOUT_FILENO, buf, strlen(buf));
+ }
+ perf[id].aborted = 0;
+ }
+ return (0);
+}
+
+/*
+ * stats --
+ * Display reader/writer thread statistics. To display the statistics
+ * for the mpool trickle or deadlock threads, use db_stat(1).
+ */
+void
+stats()
+{
+ int id;
+ char *p, buf[8192];
+
+ p = buf + sprintf(buf, "-------------\n");
+ for (id = 0; id < nreaders + nwriters;)
+ if (id++ < nwriters)
+ p += sprintf(p,
+ "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",
+ id, perf[id].adds,
+ perf[id].deletes, perf[id].aborts, perf[id].txns);
+ else
+ p += sprintf(p,
+ "reader: %2d: found: %5d: notfound: %5d: aborts: %4d\n",
+ id, perf[id].found,
+ perf[id].notfound, perf[id].aborts);
+ p += sprintf(p, "-------------\n");
+
+ write(STDOUT_FILENO, buf, p - buf);
+}
+
+/*
+ * db_init --
+ * Initialize the environment.
+ */
+DB_ENV *
+db_init(home)
+ char *home;
+{
+ DB_ENV *dbenv;
+ int ret;
+
+ if (punish) {
+ (void)db_env_set_pageyield(1);
+ (void)db_env_set_func_yield(sched_yield);
+ }
+
+ if ((ret = db_env_create(&dbenv, 0)) != 0) {
+ fprintf(stderr,
+ "%s: db_env_create: %s\n", progname, db_strerror(ret));
+ exit (1);
+ }
+ dbenv->set_errfile(dbenv, stderr);
+ dbenv->set_errpfx(dbenv, progname);
+ (void)dbenv->set_cachesize(dbenv, 0, 100 * 1024, 0);
+ (void)dbenv->set_lg_max(dbenv, 200000);
+
+ if ((ret = dbenv->open(dbenv, home,
+ DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
+ DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0)) != 0) {
+ dbenv->err(dbenv, ret, NULL);
+ (void)dbenv->close(dbenv, 0);
+ exit (1);
+ }
+ return (dbenv);
+}
+
+/*
+ * tstart --
+ * Thread start function for readers and writers.
+ */
+void *
+tstart(arg)
+ void *arg;
+{
+ pthread_t tid;
+ u_int id;
+
+ id = (u_int)arg + 1;
+
+ tid = pthread_self();
+
+ if (id <= (u_int)nwriters) {
+ printf("write thread %d starting: tid: %lu\n", id, (u_long)tid);
+ fflush(stdout);
+ writer(id);
+ } else {
+ printf("read thread %d starting: tid: %lu\n", id, (u_long)tid);
+ fflush(stdout);
+ reader(id);
+ }
+
+ /* NOTREACHED */
+ return (NULL);
+}
+
+/*
+ * deadlock --
+ * Thread start function for lock_detect().
+ */
+void *
+deadlock(arg)
+ void *arg;
+{
+ struct timeval t;
+ pthread_t tid;
+
+ arg = arg; /* XXX: shut the compiler up. */
+ tid = pthread_self();
+
+ printf("deadlock thread starting: tid: %lu\n", (u_long)tid);
+ fflush(stdout);
+
+ t.tv_sec = 0;
+ t.tv_usec = 100000;
+ for (;;) {
+ (void)lock_detect(dbenv,
+ DB_LOCK_CONFLICT, DB_LOCK_YOUNGEST, NULL);
+
+ /* Check every 100ms. */
+ (void)select(0, NULL, NULL, NULL, &t);
+ }
+
+ /* NOTREACHED */
+ return (NULL);
+}
+
+/*
+ * trickle --
+ * Thread start function for memp_trickle().
+ */
+void *
+trickle(arg)
+ void *arg;
+{
+ pthread_t tid;
+ int wrote;
+ char buf[64];
+
+ arg = arg; /* XXX: shut the compiler up. */
+ tid = pthread_self();
+
+ printf("trickle thread starting: tid: %lu\n", (u_long)tid);
+ fflush(stdout);
+
+ for (;;) {
+ (void)memp_trickle(dbenv, 10, &wrote);
+ if (verbose) {
+ sprintf(buf, "trickle: wrote %d\n", wrote);
+ write(STDOUT_FILENO, buf, strlen(buf));
+ }
+ if (wrote == 0) {
+ sleep(1);
+ sched_yield();
+ }
+ }
+
+ /* NOTREACHED */
+ return (NULL);
+}
+
+/*
+ * word --
+ * Build the dictionary word list.
+ */
+void
+word()
+{
+ FILE *fp;
+ int cnt;
+ char buf[256];
+
+ if ((fp = fopen(WORDLIST, "r")) == NULL)
+ fatal(WORDLIST, errno, 1);
+
+ if ((list = malloc(nlist * sizeof(char *))) == NULL)
+ fatal(NULL, errno, 1);
+
+ for (cnt = 0; cnt < nlist; ++cnt) {
+ if (fgets(buf, sizeof(buf), fp) == NULL)
+ break;
+ if ((list[cnt] = strdup(buf)) == NULL)
+ fatal(NULL, errno, 1);
+ }
+ nlist = cnt; /* In case nlist was larger than possible. */
+}
+
+/*
+ * fatal --
+ * Report a fatal error and quit.
+ */
+void
+fatal(msg, err, syserr)
+ char *msg;
+ int err, syserr;
+{
+ fprintf(stderr, "%s: ", progname);
+ if (msg != NULL) {
+ fprintf(stderr, "%s", msg);
+ if (syserr)
+ fprintf(stderr, ": ");
+ }
+ if (syserr)
+ fprintf(stderr, "%s", strerror(err));
+ fprintf(stderr, "\n");
+ exit (1);
+
+ /* NOTREACHED */
+}
+
+/*
+ * usage --
+ * Usage message.
+ */
+void
+usage()
+{
+ (void)fprintf(stderr,
+ "usage: %s [-pv] [-h home] [-n words] [-r readers] [-w writers]\n",
+ progname);
+ exit(1);
+}