summaryrefslogtreecommitdiff
path: root/examples_stl
diff options
context:
space:
mode:
Diffstat (limited to 'examples_stl')
-rw-r--r--examples_stl/README41
-rw-r--r--examples_stl/StlAccessExample.cpp143
-rw-r--r--examples_stl/StlTpcbExample.cpp626
-rw-r--r--examples_stl/StlTransactionGuideExample.cpp372
-rw-r--r--examples_stl/repquote/README17
-rw-r--r--examples_stl/repquote/StlRepConfigInfo.cpp56
-rw-r--r--examples_stl/repquote/StlRepConfigInfo.h36
-rw-r--r--examples_stl/repquote/StlRepQuoteExample.cpp712
8 files changed, 2003 insertions, 0 deletions
diff --git a/examples_stl/README b/examples_stl/README
new file mode 100644
index 0000000..43f568b
--- /dev/null
+++ b/examples_stl/README
@@ -0,0 +1,41 @@
+# $Id$
+
+StlAccessExample.cpp Simple Database Access.
+
+ Exstl_access uses STL simple features based on the DB access methods.
+
+ Build: make exstl_access
+
+
+repquote/ Replication.
+
+ Exstl_repquote creates a toy stock quote server
+ with DB's single-master, multiple-client replication
+ with communication over TCP, via STL API. See repquote/README.
+
+ Build: make exstl_repquote
+
+
+StlTransactionGuideExample.cpp Multithreaded DB Access.
+
+ StlTxnGuide runs multiple threads to access databases via STL API.
+
+ Build: make StlTxnGuide
+
+
+StlTpcbExample.cpp TPC/B.
+
+ Exstl_tpcb sets up a framework in which to run a TPC/B test
+ via the STL API.
+
+ Database initialization (the -i flag) and running the
+ benchmark (-n flag) must take place separately (i.e.,
+ first create the database, then run 1 or more copies of
+ the benchmark). Furthermore, when running more than one
+ TPCB process, it is necessary to run the deadlock detector
+ (db_deadlock), since it is possible for concurrent tpcb
+ processes to deadlock. For performance measurement, it
+ will also be beneficial to run the db_checkpoint process
+ as well.
+
+ Build: make exstl_tpcb
diff --git a/examples_stl/StlAccessExample.cpp b/examples_stl/StlAccessExample.cpp
new file mode 100644
index 0000000..33a9971
--- /dev/null
+++ b/examples_stl/StlAccessExample.cpp
@@ -0,0 +1,143 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2008-2009 Oracle. All rights reserved.
+ *
+ * $Id$
+ */
+#include <sys/types.h>
+
+#include <iostream>
+#include <iomanip>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <utility>
+
+#include "dbstl_map.h"
+#include "dbstl_vector.h"
+
+using namespace std;
+using namespace dbstl;
+
+#ifdef _WIN32
+extern "C" {
+ extern int getopt(int, char * const *, const char *);
+ extern int optind;
+}
+#else
+#include <unistd.h>
+#endif
+
+#include <db_cxx.h>
+
+using std::cin;
+using std::cout;
+using std::cerr;
+
+class AccessExample
+{
+public:
+ AccessExample();
+ void run();
+
+private:
+ // no need for copy and assignment
+ AccessExample(const AccessExample &);
+ void operator = (const AccessExample &);
+};
+
+int
+usage()
+{
+ (void)fprintf(stderr, "usage: AccessExample");
+ return (EXIT_FAILURE);
+}
+
+int
+main(int argc, char *argv[])
+{
+ // Use a try block just to report any errors.
+ // An alternate approach to using exceptions is to
+ // use error models (see DbEnv::set_error_model()) so
+ // that error codes are returned for all Berkeley DB methods.
+ //
+ try {
+ AccessExample app;
+ app.run();
+ return (EXIT_SUCCESS);
+ }
+ catch (DbException &dbe) {
+ cerr << "AccessExample: " << dbe.what() << "\n";
+ return (EXIT_FAILURE);
+ }
+}
+
+AccessExample::AccessExample()
+{
+}
+
+void AccessExample::run()
+{
+ typedef db_map<char *, char *, ElementHolder<char *> > strmap_t;
+ // Create a map container with inmemory anonymous database.
+ strmap_t dbmap;
+
+ // Insert records into dbmap, where the key is the user
+ // input and the data is the user input in reverse order.
+ //
+ char buf[1024], rbuf[1024];
+ char *p, *t;
+ u_int32_t len;
+
+ for (;;) {
+ // Acquire user input string as key.
+ cout << "input> ";
+ cout.flush();
+
+ cin.getline(buf, sizeof(buf));
+ if (cin.eof())
+ break;
+
+ if ((len = (u_int32_t)strlen(buf)) <= 0)
+ continue;
+ if (strcmp(buf, "quit") == 0)
+ break;
+
+ // Reverse input string as data.
+ for (t = rbuf, p = buf + (len - 1); p >= buf;)
+ *t++ = *p--;
+ *t++ = '\0';
+
+
+ // Insert key/data pair.
+ try {
+ dbmap.insert(make_pair(buf, rbuf));
+ } catch (DbException ex) {
+ if (ex.get_errno() == DB_KEYEXIST) {
+ cout << "Key " << buf << " already exists.\n";
+ } else
+ throw;
+ } catch (...) {
+ throw;
+ }
+ }
+ cout << "\n";
+
+ // We put a try block around this section of code
+ // to ensure that our database is properly closed
+ // in the event of an error.
+ //
+ try {
+ strmap_t::iterator itr;
+ for (itr = dbmap.begin();
+ itr != dbmap.end(); ++itr)
+ cout<<itr->first<<" : "<<itr->second<<endl;
+ }
+ catch (DbException ex) {
+ cerr<<"AccessExample "<<ex.what()<<endl;
+ }
+
+ dbstl_exit();
+
+}
diff --git a/examples_stl/StlTpcbExample.cpp b/examples_stl/StlTpcbExample.cpp
new file mode 100644
index 0000000..d2a7810
--- /dev/null
+++ b/examples_stl/StlTpcbExample.cpp
@@ -0,0 +1,626 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 1997-2009 Oracle. All rights reserved.
+ *
+ * $Id$
+ */
+
+#include <sys/types.h>
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+
+#include <iostream>
+
+#include "dbstl_vector.h"
+#include "dbstl_map.h"
+
+using std::cout;
+using std::cerr;
+
+typedef enum { ACCOUNT, BRANCH, TELLER } FTYPE;
+
+static int invarg(int, char *);
+u_int32_t random_id(FTYPE, u_int32_t, u_int32_t, u_int32_t);
+u_int32_t random_int(u_int32_t, u_int32_t);
+static int usage(void);
+
+int verbose;
+const char *progname = "StlTpcbExample"; // Program name.
+
+// Forward declared data classes
+class Defrec;
+class Histrec;
+
+typedef dbstl::db_map<u_int32_t, Defrec > DefrecMap;
+typedef dbstl::db_vector<Histrec > HistrecVector;
+
+class StlTpcbExample : public DbEnv
+{
+public:
+ void populate(int, int, int, int);
+ void run(int, int, int, int);
+ int txn(DefrecMap *, DefrecMap *, DefrecMap *, HistrecVector *,
+ int accounts, int branches, int tellers);
+ void populateHistory(
+ HistrecVector *, int, u_int32_t, u_int32_t, u_int32_t);
+ void populateTable(
+ DefrecMap *, u_int32_t, u_int32_t, int, const char *);
+
+ // Note: the constructor creates a DbEnv(), which is
+ // not fully initialized until the DbEnv::open() method
+ // is called.
+ //
+ StlTpcbExample(const char *home, int cachesize, int flags);
+
+private:
+ static const char FileName[];
+
+ // no need for copy and assignment
+ StlTpcbExample(const StlTpcbExample &);
+ void operator = (const StlTpcbExample &);
+};
+
+//
+// This program implements a basic TPC/B driver program. To create the
+// TPC/B database, run with the -i (init) flag. The number of records
+// with which to populate the account, history, branch, and teller tables
+// is specified by the a, s, b, and t flags respectively. To run a TPC/B
+// test, use the n flag to indicate a number of transactions to run (note
+// that you can run many of these processes in parallel to simulate a
+// multiuser test run).
+//
+#define TELLERS_PER_BRANCH 100
+#define ACCOUNTS_PER_TELLER 1000
+#define HISTORY_PER_BRANCH 2592000
+
+/*
+ * The default configuration that adheres to TPCB scaling rules requires
+ * nearly 3 GB of space. To avoid requiring that much space for testing,
+ * we set the parameters much lower. If you want to run a valid 10 TPS
+ * configuration, define VALID_SCALING.
+ */
+#ifdef VALID_SCALING
+#define ACCOUNTS 1000000
+#define BRANCHES 10
+#define TELLERS 100
+#define HISTORY 25920000
+#endif
+
+#ifdef TINY
+#define ACCOUNTS 1000
+#define BRANCHES 10
+#define TELLERS 100
+#define HISTORY 10000
+#endif
+
+#if !defined(VALID_SCALING) && !defined(TINY)
+#define ACCOUNTS 100000
+#define BRANCHES 10
+#define TELLERS 100
+#define HISTORY 259200
+#endif
+
+#define HISTORY_LEN 100
+#define RECLEN 100
+#define BEGID 1000000
+
+class Defrec {
+public:
+ u_int32_t id;
+ u_int32_t balance;
+ u_int8_t pad[RECLEN - sizeof(u_int32_t) - sizeof(u_int32_t)];
+};
+
+class Histrec {
+public:
+ u_int32_t aid;
+ u_int32_t bid;
+ u_int32_t tid;
+ u_int32_t amount;
+ u_int8_t pad[RECLEN - 4 * sizeof(u_int32_t)];
+};
+
+int
+main(int argc, char *argv[])
+{
+ unsigned long seed;
+ int accounts, branches, tellers, history;
+ int iflag, mpool, ntxns, txn_no_sync;
+ const char *home;
+ char *endarg;
+
+ home = "TESTDIR";
+ accounts = branches = history = tellers = 0;
+ txn_no_sync = 0;
+ mpool = ntxns = 0;
+ verbose = 0;
+ iflag = 0;
+ seed = (unsigned long)time(NULL);
+
+ for (int i = 1; i < argc; ++i) {
+
+ if (strcmp(argv[i], "-a") == 0) {
+ // Number of account records
+ if ((accounts = atoi(argv[++i])) <= 0)
+ return (invarg('a', argv[i]));
+ }
+ else if (strcmp(argv[i], "-b") == 0) {
+ // Number of branch records
+ if ((branches = atoi(argv[++i])) <= 0)
+ return (invarg('b', argv[i]));
+ }
+ else if (strcmp(argv[i], "-c") == 0) {
+ // Cachesize in bytes
+ if ((mpool = atoi(argv[++i])) <= 0)
+ return (invarg('c', argv[i]));
+ }
+ else if (strcmp(argv[i], "-f") == 0) {
+ // Fast mode: no txn sync.
+ txn_no_sync = 1;
+ }
+ else if (strcmp(argv[i], "-h") == 0) {
+ // DB home.
+ home = argv[++i];
+ }
+ else if (strcmp(argv[i], "-i") == 0) {
+ // Initialize the test.
+ iflag = 1;
+ }
+ else if (strcmp(argv[i], "-n") == 0) {
+ // Number of transactions
+ if ((ntxns = atoi(argv[++i])) <= 0)
+ return (invarg('n', argv[i]));
+ }
+ else if (strcmp(argv[i], "-S") == 0) {
+ // Random number seed.
+ seed = strtoul(argv[++i], &endarg, 0);
+ if (*endarg != '\0')
+ return (invarg('S', argv[i]));
+ }
+ else if (strcmp(argv[i], "-s") == 0) {
+ // Number of history records
+ if ((history = atoi(argv[++i])) <= 0)
+ return (invarg('s', argv[i]));
+ }
+ else if (strcmp(argv[i], "-t") == 0) {
+ // Number of teller records
+ if ((tellers = atoi(argv[++i])) <= 0)
+ return (invarg('t', argv[i]));
+ }
+ else if (strcmp(argv[i], "-v") == 0) {
+ // Verbose option.
+ verbose = 1;
+ }
+ else {
+ return (usage());
+ }
+ }
+
+ srand((unsigned int)seed);
+
+ accounts = accounts == 0 ? ACCOUNTS : accounts;
+ branches = branches == 0 ? BRANCHES : branches;
+ tellers = tellers == 0 ? TELLERS : tellers;
+ history = history == 0 ? HISTORY : history;
+
+ if (verbose)
+ cout << (long)accounts << " Accounts, "
+ << (long)branches << " Branches, "
+ << (long)tellers << " Tellers, "
+ << (long)history << " History\n";
+
+ try {
+ // Initialize the database environment.
+ // Must be done in within a try block, unless you
+ // change the error model in the environment options.
+ //
+ StlTpcbExample app(
+ home, mpool, txn_no_sync ? DB_TXN_NOSYNC : 0);
+
+ if (iflag) {
+ if (ntxns != 0)
+ return (usage());
+ app.populate(accounts, branches, history, tellers);
+ }
+ else {
+ if (ntxns == 0)
+ return (usage());
+ app.run(ntxns, accounts, branches, tellers);
+ }
+
+ dbstl::dbstl_exit();
+ return (EXIT_SUCCESS);
+ }
+ catch (DbException &dbe) {
+ cerr << "StlTpcbExample: " << dbe.what() << "\n";
+ return (EXIT_FAILURE);
+ }
+}
+
+static int
+invarg(int arg, char *str)
+{
+ cerr << "StlTpcbExample: invalid argument for -"
+ << (char)arg << ": " << str << "\n";
+ return (EXIT_FAILURE);
+}
+
+static int
+usage()
+{
+ cerr << "usage: StlTpcbExample [-fiv] [-a accounts] [-b branches]\n"
+ << " [-c cachesize] [-h home] [-n transactions]\n"
+ << " [-S seed] [-s history] [-t tellers]\n";
+ return (EXIT_FAILURE);
+}
+
+StlTpcbExample::StlTpcbExample(const char *home, int cachesize, int flags)
+: DbEnv(DB_CXX_NO_EXCEPTIONS)
+{
+ u_int32_t local_flags;
+
+ set_error_stream(&cerr);
+ set_errpfx("StlTpcbExample");
+ (void)set_lk_detect(DB_LOCK_DEFAULT);
+ (void)set_cachesize(0, cachesize == 0 ?
+ 4 * 1024 * 1024 : (u_int32_t)cachesize, 0);
+
+ set_lk_max_lockers(1024 * 128);
+ set_lk_max_locks(1024 * 128);
+ set_lk_max_objects(1024 * 128);
+ if (flags & (DB_TXN_NOSYNC))
+ set_flags(DB_TXN_NOSYNC, 1);
+ flags &= ~(DB_TXN_NOSYNC);
+
+ local_flags = flags | DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
+ DB_INIT_MPOOL | DB_INIT_TXN;
+ open(home, local_flags, 0);
+ dbstl::register_db_env(this);
+}
+
+//
+// Initialize the database to the specified number of accounts, branches,
+// history records, and tellers.
+//
+void
+StlTpcbExample::populate(int accounts, int branches, int history, int tellers)
+{
+ Db *dbp;
+ DefrecMap *accounts_map, *branches_map, *tellers_map;
+ HistrecVector *history_vector;
+
+ int err, oflags;
+ u_int32_t balance, idnum;
+ u_int32_t end_anum, end_bnum, end_tnum;
+ u_int32_t start_anum, start_bnum, start_tnum;
+
+ idnum = BEGID;
+ balance = 500000;
+ oflags = DB_CREATE;
+
+ dbp = new Db(this, DB_CXX_NO_EXCEPTIONS);
+ dbp->set_h_nelem((unsigned int)accounts);
+
+ if ((err = dbp->open(NULL, "account", NULL,
+ DB_HASH, oflags, 0644)) != 0) {
+ DbException except("Account file create failed", err);
+ throw except;
+ }
+
+ dbstl::register_db(dbp);
+ accounts_map = new DefrecMap(dbp, this);
+ start_anum = idnum;
+ populateTable(accounts_map, idnum, balance, accounts, "account");
+ idnum += accounts;
+ end_anum = idnum - 1;
+ // Automatically closes the underlying database.
+ delete accounts_map;
+ dbstl::close_db(dbp);
+ delete dbp;
+ if (verbose)
+ cout << "Populated accounts: "
+ << (long)start_anum << " - " << (long)end_anum << "\n";
+
+ dbp = new Db(this, DB_CXX_NO_EXCEPTIONS);
+ //
+ // Since the number of branches is very small, we want to use very
+ // small pages and only 1 key per page. This is the poor-man's way
+ // of getting key locking instead of page locking.
+ //
+ dbp->set_h_ffactor(1);
+ dbp->set_h_nelem((unsigned int)branches);
+ dbp->set_pagesize(512);
+
+ if ((err = dbp->open(NULL,
+ "branch", NULL, DB_HASH, oflags, 0644)) != 0) {
+ DbException except("Branch file create failed", err);
+ throw except;
+ }
+ dbstl::register_db(dbp);
+ branches_map = new DefrecMap(dbp, this);
+ start_bnum = idnum;
+ populateTable(branches_map, idnum, balance, branches, "branch");
+ idnum += branches;
+ end_bnum = idnum - 1;
+ delete branches_map;
+ dbstl::close_db(dbp);
+ delete dbp;
+
+ if (verbose)
+ cout << "Populated branches: "
+ << (long)start_bnum << " - " << (long)end_bnum << "\n";
+
+ dbp = new Db(this, DB_CXX_NO_EXCEPTIONS);
+ //
+ // In the case of tellers, we also want small pages, but we'll let
+ // the fill factor dynamically adjust itself.
+ //
+ dbp->set_h_ffactor(0);
+ dbp->set_h_nelem((unsigned int)tellers);
+ dbp->set_pagesize(512);
+
+ if ((err = dbp->open(NULL,
+ "teller", NULL, DB_HASH, oflags, 0644)) != 0) {
+ DbException except("Teller file create failed", err);
+ throw except;
+ }
+
+ dbstl::register_db(dbp);
+ tellers_map = new DefrecMap(dbp, this);
+ start_tnum = idnum;
+ populateTable(tellers_map, idnum, balance, tellers, "teller");
+ idnum += tellers;
+ end_tnum = idnum - 1;
+ delete tellers_map;
+ dbstl::close_db(dbp);
+ delete dbp;
+ if (verbose)
+ cout << "Populated tellers: "
+ << (long)start_tnum << " - " << (long)end_tnum << "\n";
+
+ dbp = new Db(this, DB_CXX_NO_EXCEPTIONS);
+ dbp->set_re_len(HISTORY_LEN);
+ if ((err = dbp->open(NULL,
+ "history", NULL, DB_RECNO, oflags, 0644)) != 0) {
+ DbException except("Create of history file failed", err);
+ throw except;
+ }
+
+ dbstl::register_db(dbp);
+ history_vector = new HistrecVector(dbp, this);
+ populateHistory(history_vector, history, accounts, branches, tellers);
+ delete history_vector;
+ dbstl::close_db(dbp);
+ delete dbp;
+}
+
+void
+StlTpcbExample::populateTable(DefrecMap *drm, u_int32_t start_id,
+ u_int32_t balance, int nrecs, const char *msg)
+{
+ Defrec drec;
+ int i;
+ dbstl::pair<dbstl::db_map<u_int32_t, Defrec >::iterator, bool > ib;
+
+ memset(&drec.pad[0], 1, sizeof(drec.pad));
+ try {
+ for (i = 0; i < nrecs; i++) {
+ drec.id = start_id + (u_int32_t)i;
+ drec.balance = balance;
+ ib = drm->insert(dbstl::make_pair(drec.id, drec));
+ if (ib.second == false)
+ throw "failed to insert record";
+ }
+ } catch (...) {
+ throw;
+ }
+}
+
+void
+StlTpcbExample::populateHistory(HistrecVector *hrm, int nrecs,
+ u_int32_t accounts, u_int32_t branches,
+ u_int32_t tellers)
+{
+ Histrec hrec;
+ int i;
+
+ memset(&hrec.pad[0], 1, sizeof(hrec.pad));
+ hrec.amount = 10;
+ try {
+ for (i = 1; i <= nrecs; i++) {
+ hrec.aid = random_id(
+ ACCOUNT, accounts, branches, tellers);
+ hrec.bid = random_id(
+ BRANCH, accounts, branches, tellers);
+ hrec.tid = random_id(
+ TELLER, accounts, branches, tellers);
+ hrm->push_back(hrec);
+ }
+ } catch (...) {
+ throw;
+ }
+}
+
+u_int32_t
+random_int(u_int32_t lo, u_int32_t hi)
+{
+ u_int32_t ret;
+ int t;
+
+ t = rand();
+ ret = (u_int32_t)(((double)t / ((double)(RAND_MAX) + 1)) *
+ (hi - lo + 1));
+ ret += lo;
+ return (ret);
+}
+
+u_int32_t
+random_id(FTYPE type, u_int32_t accounts, u_int32_t branches, u_int32_t tellers)
+{
+ u_int32_t min, max, num;
+
+ max = min = BEGID;
+ num = accounts;
+ switch (type) {
+ case TELLER:
+ min += branches;
+ num = tellers;
+ // Fallthrough
+ case BRANCH:
+ if (type == BRANCH)
+ num = branches;
+ min += accounts;
+ // Fallthrough
+ case ACCOUNT:
+ max = min + num - 1;
+ }
+ return (random_int(min, max));
+}
+
+void
+StlTpcbExample::run(int n, int accounts, int branches, int tellers)
+{
+ Db *adb, *bdb, *hdb, *tdb;
+ DefrecMap *accounts_map, *branches_map, *tellers_map;
+ HistrecVector *history_vector;
+ int failed, oflags, ret, txns;
+ time_t start_time, end_time;
+
+ //
+ // Open the database files.
+ //
+ oflags = DB_AUTO_COMMIT;
+
+ int err;
+ adb = new Db(this, DB_CXX_NO_EXCEPTIONS);
+ if ((err = adb->open(NULL,
+ "account", NULL, DB_UNKNOWN, oflags, 0)) != 0) {
+ DbException except("Open of account file failed", err);
+ throw except;
+ }
+ dbstl::register_db(adb);
+ accounts_map = new DefrecMap(adb);
+
+ bdb = new Db(this, DB_CXX_NO_EXCEPTIONS);
+ if ((err = bdb->open(NULL,
+ "branch", NULL, DB_UNKNOWN, oflags, 0)) != 0) {
+ DbException except("Open of branch file failed", err);
+ throw except;
+ }
+ dbstl::register_db(bdb);
+ branches_map = new DefrecMap(bdb);
+
+ tdb = new Db(this, DB_CXX_NO_EXCEPTIONS);
+ if ((err = tdb->open(NULL,
+ "teller", NULL, DB_UNKNOWN, oflags, 0)) != 0) {
+ DbException except("Open of teller file failed", err);
+ throw except;
+ }
+ dbstl::register_db(tdb);
+ tellers_map = new DefrecMap(tdb);
+
+ hdb = new Db(this, DB_CXX_NO_EXCEPTIONS);
+ if ((err = hdb->open(NULL,
+ "history", NULL, DB_UNKNOWN, oflags, 0)) != 0) {
+ DbException except("Open of history file failed", err);
+ throw except;
+ }
+ dbstl::register_db(hdb);
+ history_vector = new HistrecVector(hdb);
+
+ (void)time(&start_time);
+ for (txns = n, failed = 0; n-- > 0;)
+ if ((ret = txn(accounts_map, branches_map, tellers_map,
+ history_vector, accounts, branches, tellers)) != 0)
+ ++failed;
+ (void)time(&end_time);
+ if (end_time == start_time)
+ ++end_time;
+ // We use printf because it provides much simpler
+ // formatting than iostreams.
+ //
+ printf("%s: %d txns: %d failed, %d sec, %.2f TPS\n", progname,
+ txns, failed, (int)(end_time - start_time),
+ (txns - failed) / (double)(end_time - start_time));
+
+ delete accounts_map;
+ delete branches_map;
+ delete tellers_map;
+ delete history_vector;
+ dbstl::close_all_dbs();
+}
+
+//
+// XXX Figure out the appropriate way to pick out IDs.
+//
+int
+StlTpcbExample::txn(DefrecMap *accounts_map, DefrecMap *branches_map,
+ DefrecMap *tellers_map, HistrecVector *history_vector,
+ int accounts, int branches, int tellers)
+{
+ Histrec hrec;
+ DefrecMap::value_type_wrap::second_type recref, recref2, recref3;
+ int account, branch, teller;
+
+ /*
+ * !!!
+ * This is sample code -- we could move a lot of this into the driver
+ * to make it faster.
+ */
+ account = random_id(ACCOUNT, accounts, branches, tellers);
+ branch = random_id(BRANCH, accounts, branches, tellers);
+ teller = random_id(TELLER, accounts, branches, tellers);
+
+ hrec.aid = account;
+ hrec.bid = branch;
+ hrec.tid = teller;
+ hrec.amount = 10;
+
+ /*
+ * START PER-TRANSACTION TIMING.
+ *
+ * Technically, TPCB requires a limit on response time, you only get
+ * to count transactions that complete within 2 seconds. That's not
+ * an issue for this sample application -- regardless, here's where
+ * the transaction begins.
+ */
+ try {
+ dbstl::begin_txn(0, this);
+
+ /* Account record */
+ recref = (*accounts_map)[account];
+ recref.balance += 10;
+ recref._DB_STL_StoreElement();
+
+ /* Branch record */
+ recref2 = (*branches_map)[branch];
+ recref2.balance += 10;
+ recref2._DB_STL_StoreElement();
+
+ /* Teller record */
+ recref3 = (*tellers_map)[teller];
+ recref3.balance += 10;
+ recref3._DB_STL_StoreElement();
+
+ /* History record */
+ history_vector->push_back(hrec);
+ dbstl::commit_txn(this);
+ /* END PER-TRANSACTION TIMING. */
+ return (0);
+
+ } catch (DbDeadlockException) {
+ dbstl::abort_txn(this);
+ if (verbose)
+ cout << "Transaction A=" << (long)account
+ << " B=" << (long)branch
+ << " T=" << (long)teller << " failed\n";
+ return (DB_LOCK_DEADLOCK);
+ } catch(...) {
+ dbstl::abort_txn(this);
+ throw;
+ }
+}
diff --git a/examples_stl/StlTransactionGuideExample.cpp b/examples_stl/StlTransactionGuideExample.cpp
new file mode 100644
index 0000000..bfc83cb
--- /dev/null
+++ b/examples_stl/StlTransactionGuideExample.cpp
@@ -0,0 +1,372 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2008-2009 Oracle. All rights reserved.
+ *
+ * $Id$
+ */
+
+// File txn_guide_stl.cpp
+#include <iostream>
+#include <db_cxx.h>
+
+#include "dbstl_map.h"
+
+#ifdef _WIN32
+#include <windows.h>
+extern "C" {
+ extern int _tgetopt(int nargc, TCHAR* const* nargv, const TCHAR * ostr);
+ extern TCHAR *optarg;
+}
+#define PATHD '\\'
+
+typedef HANDLE thread_t;
+#define thread_create(thrp, attr, func, arg) \
+ (((*(thrp) = CreateThread(NULL, 0, \
+ (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
+#define thread_join(thr, statusp) \
+ ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
+ ((statusp == NULL) ? 0 : \
+ (GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)))
+
+typedef HANDLE mutex_t;
+#define mutex_init(m, attr) \
+ (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
+#define mutex_lock(m) \
+ ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
+#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1)
+#else
+#include <pthread.h>
+#include <unistd.h>
+#define PATHD '/'
+
+typedef pthread_t thread_t;
+#define thread_create(thrp, attr, func, arg) \
+ pthread_create((thrp), (attr), (func), (arg))
+#define thread_join(thr, statusp) pthread_join((thr), (statusp))
+
+typedef pthread_mutex_t mutex_t;
+#define mutex_init(m, attr) pthread_mutex_init((m), (attr))
+#define mutex_lock(m) pthread_mutex_lock(m)
+#define mutex_unlock(m) pthread_mutex_unlock(m)
+#endif
+
+// Run 5 writers threads at a time.
+#define NUMWRITERS 5
+using namespace dbstl;
+typedef db_multimap<const char *, int, ElementHolder<int> > strmap_t;
+// Printing of thread_t is implementation-specific, so we
+// create our own thread IDs for reporting purposes.
+int global_thread_num;
+mutex_t thread_num_lock;
+
+// Forward declarations
+int countRecords(strmap_t *);
+int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t);
+int usage(void);
+void *writerThread(void *);
+
+// Usage function
+int
+usage()
+{
+ std::cerr << " [-h <database_home_directory>] [-m (in memory use)]"
+ << std::endl;
+ return (EXIT_FAILURE);
+}
+
+int
+main(int argc, char *argv[])
+{
+ // Initialize our handles
+ Db *dbp = NULL;
+ DbEnv *envp = NULL;
+
+ thread_t writerThreads[NUMWRITERS];
+ int i, inmem;
+ u_int32_t envFlags;
+ const char *dbHomeDir;
+
+ inmem = 0;
+ // Application name
+ const char *progName = "TxnGuideStl";
+
+ // Database file name
+ const char *fileName = "mydb.db";
+
+ // Parse the command line arguments
+#ifdef _WIN32
+ dbHomeDir = ".\\TESTDIR";
+#else
+ dbHomeDir = "./TESTDIR";
+#endif
+
+ // Env open flags
+ envFlags =
+ DB_CREATE | // Create the environment if it does not exist
+ DB_RECOVER | // Run normal recovery.
+ DB_INIT_LOCK | // Initialize the locking subsystem
+ DB_INIT_LOG | // Initialize the logging subsystem
+ DB_INIT_TXN | // Initialize the transactional subsystem. This
+ // also turns on logging.
+ DB_INIT_MPOOL | // Initialize the memory pool (in-memory cache)
+ DB_THREAD; // Cause the environment to be free-threaded
+
+ try {
+ // Create and open the environment
+ envp = new DbEnv(DB_CXX_NO_EXCEPTIONS);
+
+ // Indicate that we want db to internally perform deadlock
+ // detection. Also indicate that the transaction with
+ // the fewest number of write locks will receive the
+ // deadlock notification in the event of a deadlock.
+ envp->set_lk_detect(DB_LOCK_MINWRITE);
+
+ if (inmem) {
+ envp->set_lg_bsize(64 * 1024 * 1024);
+ envp->open(NULL, envFlags, 0644);
+ fileName = NULL;
+ } else
+ envp->open(dbHomeDir, envFlags, 0644);
+
+ // If we had utility threads (for running checkpoints or
+ // deadlock detection, for example) we would spawn those
+ // here. However, for a simple example such as this,
+ // that is not required.
+
+ // Open the database
+ openDb(&dbp, progName, fileName,
+ envp, DB_DUP);
+
+ // Call this function before any use of dbstl in a single thread
+ // if multiple threads are using dbstl.
+ dbstl::dbstl_startup();
+
+ // We created the dbp and envp handles not via dbstl::open_db/open_env
+ // functions, so we must register the handles in each thread using the
+ // container.
+ dbstl::register_db(dbp);
+ dbstl::register_db_env(envp);
+
+ strmap_t *strmap = new strmap_t(dbp, envp);
+ // Initialize a mutex. Used to help provide thread ids.
+ (void)mutex_init(&thread_num_lock, NULL);
+
+ // Start the writer threads.
+ for (i = 0; i < NUMWRITERS; i++)
+ (void)thread_create(&writerThreads[i], NULL,
+ writerThread, (void *)strmap);
+
+
+ // Join the writers
+ for (i = 0; i < NUMWRITERS; i++)
+ (void)thread_join(writerThreads[i], NULL);
+
+ delete strmap;
+
+ } catch(DbException &e) {
+ std::cerr << "Error opening database environment: "
+ << (inmem ? "NULL" : dbHomeDir) << std::endl;
+ std::cerr << e.what() << std::endl;
+ dbstl_exit();
+ return (EXIT_FAILURE);
+ }
+
+ // Environment and database will be automatically closed by dbstl.
+
+ // Final status message and return.
+
+ std::cout << "I'm all done." << std::endl;
+
+ dbstl_exit();
+ delete envp;
+ return (EXIT_SUCCESS);
+}
+
+// A function that performs a series of writes to a
+// Berkeley DB database. The information written
+// to the database is largely nonsensical, but the
+// mechanism of transactional commit/abort and
+// deadlock detection is illustrated here.
+void *
+writerThread(void *args)
+{
+ int j, thread_num;
+ int max_retries = 1; // Max retry on a deadlock
+ const char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
+ "key 5", "key 6", "key 7", "key 8",
+ "key 9", "key 10"};
+
+ strmap_t *strmap = (strmap_t *)args;
+ DbEnv *envp = strmap->get_db_env_handle();
+
+ // We created the dbp and envp handles not via dbstl::open_db/open_env
+ // functions, so we must register the handles in each thread using the
+ // container.
+ dbstl::register_db(strmap->get_db_handle());
+ dbstl::register_db_env(envp);
+
+ // Get the thread number
+ (void)mutex_lock(&thread_num_lock);
+ global_thread_num++;
+ thread_num = global_thread_num;
+ (void)mutex_unlock(&thread_num_lock);
+
+ // Initialize the random number generator
+ srand(thread_num);
+
+ // Perform 50 transactions
+ for (int i = 0; i < 1; i++) {
+ DbTxn *txn;
+ int retry = 100;
+ int retry_count = 0, payload;
+ // while loop is used for deadlock retries
+ while (retry--) {
+ // try block used for deadlock detection and
+ // general db exception handling
+ try {
+
+ // Begin our transaction. We group multiple writes in
+ // this thread under a single transaction so as to
+ // (1) show that you can atomically perform multiple
+ // writes at a time, and (2) to increase the chances
+ // of a deadlock occurring so that we can observe our
+ // deadlock detection at work.
+
+ // Normally we would want to avoid the potential for
+ // deadlocks, so for this workload the correct thing
+ // would be to perform our puts with autocommit. But
+ // that would excessively simplify our example, so we
+ // do the "wrong" thing here instead.
+ txn = dbstl::begin_txn(0, envp);
+
+ // Perform the database write for this transaction.
+ for (j = 0; j < 10; j++) {
+ payload = rand() + i;
+ strmap->insert(make_pair(key_strings[j], payload));
+ }
+
+ // countRecords runs a cursor over the entire database.
+ // We do this to illustrate issues of deadlocking
+ std::cout << thread_num << " : Found "
+ << countRecords(strmap)
+ << " records in the database." << std::endl;
+
+ std::cout << thread_num << " : committing txn : " << i
+ << std::endl;
+
+ // commit
+ try {
+ dbstl::commit_txn(envp);
+ } catch (DbException &e) {
+ std::cout << "Error on txn commit: "
+ << e.what() << std::endl;
+ }
+ } catch (DbDeadlockException &) {
+ // First thing that we MUST do is abort the transaction.
+ try {
+ dbstl::abort_txn(envp);
+ } catch (DbException ex1) {
+ std::cout<<ex1.what();
+ }
+
+ // Now we decide if we want to retry the operation.
+ // If we have retried less than max_retries,
+ // increment the retry count and goto retry.
+ if (retry_count < max_retries) {
+ std::cout << "############### Writer " << thread_num
+ << ": Got DB_LOCK_DEADLOCK.\n"
+ << "Retrying write operation."
+ << std::endl;
+ retry_count++;
+
+ } else {
+ // Otherwise, just give up.
+ std::cerr << "Writer " << thread_num
+ << ": Got DeadLockException and out of "
+ << "retries. Giving up." << std::endl;
+ retry = 0;
+ }
+ } catch (DbException &e) {
+ std::cerr << "db_map<> storage failed" << std::endl;
+ std::cerr << e.what() << std::endl;
+ dbstl::abort_txn(envp);
+ retry = 0;
+ } catch (std::exception &ee) {
+ std::cerr << "Unknown exception: " << ee.what() << std::endl;
+ return (0);
+ }
+ }
+ }
+ return (0);
+}
+
+
+// This simply counts the number of records contained in the
+// database and returns the result.
+//
+// Note that this method exists only for illustrative purposes.
+// A more straight-forward way to count the number of records in
+// a database is to use the db_map<>::size() method.
+int
+countRecords(strmap_t *strmap)
+{
+
+ int count = 0;
+ strmap_t::iterator itr;
+ try {
+ // Set the flag used by Db::cursor.
+ for (itr = strmap->begin(); itr != strmap->end(); ++itr)
+ count++;
+ } catch (DbDeadlockException &de) {
+ std::cerr << "countRecords: got deadlock" << std::endl;
+ // itr's cursor will be automatically closed when it is destructed.
+ throw de;
+ } catch (DbException &e) {
+ std::cerr << "countRecords error:" << std::endl;
+ std::cerr << e.what() << std::endl;
+ }
+
+ // itr's cursor will be automatically closed when it is destructed.
+
+ return (count);
+}
+
+
+// Open a Berkeley DB database
+int
+openDb(Db **dbpp, const char *progname, const char *fileName,
+ DbEnv *envp, u_int32_t extraFlags)
+{
+ int ret;
+ u_int32_t openFlags;
+
+ try {
+ Db *dbp = new Db(envp, DB_CXX_NO_EXCEPTIONS);
+
+ // Point to the new'd Db.
+ *dbpp = dbp;
+
+ if (extraFlags != 0)
+ ret = dbp->set_flags(extraFlags);
+
+ // Now open the database.
+ openFlags = DB_CREATE | // Allow database creation
+ DB_READ_UNCOMMITTED | // Allow uncommitted reads
+ DB_AUTO_COMMIT; // Allow autocommit
+
+ dbp->open(NULL, // Txn pointer
+ fileName, // File name
+ NULL, // Logical db name
+ DB_BTREE, // Database type (using btree)
+ openFlags, // Open flags
+ 0); // File mode. Using defaults
+ } catch (DbException &e) {
+ std::cerr << progname << ": openDb: db open failed:" << std::endl;
+ std::cerr << e.what() << std::endl;
+ return (EXIT_FAILURE);
+ }
+
+ return (EXIT_SUCCESS);
+}
+
diff --git a/examples_stl/repquote/README b/examples_stl/repquote/README
new file mode 100644
index 0000000..86d0759
--- /dev/null
+++ b/examples_stl/repquote/README
@@ -0,0 +1,17 @@
+# $Id$
+
+This is the directory for the replication example program.
+
+The example is a toy stock quote server. It uses the replication manager to
+make use of DB replication, and uses STL API to acces the database.
+
+
+StlRepQuoteExample.cpp
+ Contains code to implement the basic functions of the
+ application, and code necessary to configure the
+ application to use Replication Manager.
+
+StlRepConfigInfo.h
+StlRepConfigInfo.cpp
+ Contains code to manage user configurations to this program,
+ including those to the DB replication manager.
diff --git a/examples_stl/repquote/StlRepConfigInfo.cpp b/examples_stl/repquote/StlRepConfigInfo.cpp
new file mode 100644
index 0000000..a80847a
--- /dev/null
+++ b/examples_stl/repquote/StlRepConfigInfo.cpp
@@ -0,0 +1,56 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2001-2009 Oracle. All rights reserved.
+ *
+ * $Id$
+ */
+
+#include "StlRepConfigInfo.h"
+
+#include <cstdlib>
+
+RepConfigInfo::RepConfigInfo()
+{
+ start_policy = DB_REP_ELECTION;
+ home = "TESTDIR";
+ got_listen_address = false;
+ totalsites = 0;
+ priority = 100;
+ verbose = false;
+ other_hosts = NULL;
+ ack_policy = DB_REPMGR_ACKS_QUORUM;
+ bulk = false;
+}
+
+RepConfigInfo::~RepConfigInfo()
+{
+ // release any other_hosts structs.
+ if (other_hosts != NULL) {
+ REP_HOST_INFO *CurItem = other_hosts;
+ while (CurItem->next != NULL)
+ {
+ REP_HOST_INFO *TmpItem = CurItem->next;
+ free(CurItem);
+ CurItem = TmpItem;
+ }
+ free(CurItem);
+ }
+ other_hosts = NULL;
+}
+
+void RepConfigInfo::addOtherHost(char* host, int port, bool peer)
+{
+ REP_HOST_INFO *newinfo;
+ newinfo = (REP_HOST_INFO*)malloc(sizeof(REP_HOST_INFO));
+ newinfo->host = host;
+ newinfo->port = port;
+ newinfo->peer = peer;
+ if (other_hosts == NULL) {
+ other_hosts = newinfo;
+ newinfo->next = NULL;
+ } else {
+ newinfo->next = other_hosts;
+ other_hosts = newinfo;
+ }
+}
diff --git a/examples_stl/repquote/StlRepConfigInfo.h b/examples_stl/repquote/StlRepConfigInfo.h
new file mode 100644
index 0000000..a4f36dc
--- /dev/null
+++ b/examples_stl/repquote/StlRepConfigInfo.h
@@ -0,0 +1,36 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2001-2009 Oracle. All rights reserved.
+ *
+ * $Id$
+ */
+#include <db_cxx.h>
+
+// Chainable struct used to store host information.
+typedef struct RepHostInfoObj{
+ char* host;
+ int port;
+ bool peer; // only relevant for "other" hosts
+ RepHostInfoObj* next; // used for chaining multiple "other" hosts.
+} REP_HOST_INFO;
+
+class RepConfigInfo {
+public:
+ RepConfigInfo();
+ virtual ~RepConfigInfo();
+
+ void addOtherHost(char* host, int port, bool peer);
+public:
+ u_int32_t start_policy;
+ const char* home;
+ bool got_listen_address;
+ REP_HOST_INFO this_host;
+ int totalsites;
+ int priority;
+ bool verbose;
+ // used to store a set of optional other hosts.
+ REP_HOST_INFO *other_hosts;
+ int ack_policy;
+ bool bulk;
+};
diff --git a/examples_stl/repquote/StlRepQuoteExample.cpp b/examples_stl/repquote/StlRepQuoteExample.cpp
new file mode 100644
index 0000000..bf73d98
--- /dev/null
+++ b/examples_stl/repquote/StlRepQuoteExample.cpp
@@ -0,0 +1,712 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2001-2009 Oracle. All rights reserved.
+ *
+ * $Id$
+ */
+
+/*
+ * In this application, we specify all communication via the command line. In
+ * a real application, we would expect that information about the other sites
+ * in the system would be maintained in some sort of configuration file. The
+ * critical part of this interface is that we assume at startup that we can
+ * find out
+ * 1) what our Berkeley DB home environment is,
+ * 2) what host/port we wish to listen on for connections; and
+ * 3) an optional list of other sites we should attempt to connect to.
+ *
+ * These pieces of information are expressed by the following flags.
+ * -h home (required; h stands for home directory)
+ * -l host:port (required; l stands for local)
+ * -C or -M (optional; start up as client or master)
+ * -r host:port (optional; r stands for remote; any number of these may be
+ * specified)
+ * -R host:port (optional; R stands for remote peer; only one of these may
+ * be specified)
+ * -a all|quorum (optional; a stands for ack policy)
+ * -b (optional; b stands for bulk)
+ * -n nsites (optional; number of sites in replication group; defaults to 0
+ * to try to dynamically compute nsites)
+ * -p priority (optional; defaults to 100)
+ * -v (optional; v stands for verbose)
+ */
+
+#include <iostream>
+#include <string>
+#include <sstream>
+
+#include <db_cxx.h>
+#include "StlRepConfigInfo.h"
+#include "dbstl_map.h"
+
+using std::cout;
+using std::cin;
+using std::cerr;
+using std::endl;
+using std::flush;
+using std::istream;
+using std::istringstream;
+using std::string;
+using std::getline;
+using namespace dbstl;
+#define CACHESIZE (10 * 1024 * 1024)
+#define DATABASE "quote.db"
+
+const char *progname = "exstl_repquote";
+
+#include <errno.h>
+#ifdef _WIN32
+#define WIN32_LEAN_AND_MEAN
+#include <windows.h>
+#define snprintf _snprintf
+#define sleep(s) Sleep(1000 * (s))
+
+extern "C" {
+extern int getopt(int, char * const *, const char *);
+extern char *optarg;
+extern int optind;
+}
+
+typedef HANDLE thread_t;
+typedef DWORD thread_exit_status_t;
+#define thread_create(thrp, attr, func, arg) \
+ (((*(thrp) = CreateThread(NULL, 0, \
+ (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
+#define thread_join(thr, statusp) \
+ ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
+ GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
+#else /* !_WIN32 */
+#include <pthread.h>
+
+typedef pthread_t thread_t;
+typedef void* thread_exit_status_t;
+#define thread_create(thrp, attr, func, arg) \
+ pthread_create((thrp), (attr), (func), (arg))
+#define thread_join(thr, statusp) pthread_join((thr), (statusp))
+#endif
+
+// Struct used to store information in Db app_private field.
+typedef struct {
+ bool app_finished;
+ bool in_client_sync;
+ bool is_master;
+} APP_DATA;
+
+static void log(const char *);
+void *checkpoint_thread (void *);
+void *log_archive_thread (void *);
+
+class RepQuoteExample
+{
+public:
+ typedef db_map<char *, char *, ElementHolder<char *> > str_map_t;
+ RepQuoteExample();
+ void init(RepConfigInfo* config);
+ void doloop();
+ int terminate();
+
+ static void event_callback(DbEnv * dbenv, u_int32_t which, void *info);
+
+private:
+ // disable copy constructor.
+ RepQuoteExample(const RepQuoteExample &);
+ void operator = (const RepQuoteExample &);
+
+ // internal data members.
+ APP_DATA app_data;
+ RepConfigInfo *app_config;
+ DbEnv *cur_env;
+ Db *dbp;
+ str_map_t *strmap;
+ thread_t ckp_thr;
+ thread_t lga_thr;
+
+ // private methods.
+ void print_stocks();
+ void prompt();
+ bool open_db(bool creating);
+ void close_db(){
+ delete strmap;
+ strmap = NULL;
+ dbstl::close_db(dbp);
+ dbp = NULL;
+ }
+ static void close_db(Db *&);// Close an unregistered Db handle.
+};
+
+bool RepQuoteExample::open_db(bool creating)
+{
+ int ret;
+
+ if (dbp)
+ return true;
+
+ dbp = new Db(cur_env, DB_CXX_NO_EXCEPTIONS);
+
+ u_int32_t flags = DB_AUTO_COMMIT | DB_THREAD;
+ if (creating)
+ flags |= DB_CREATE;
+
+ ret = dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0);
+ switch (ret) {
+ case 0:
+ register_db(dbp);
+ if (strmap)
+ delete strmap;
+ strmap = new str_map_t(dbp, cur_env);
+ return (true);
+ case DB_LOCK_DEADLOCK: // Fall through
+ case DB_REP_HANDLE_DEAD:
+ log("\nFailed to open stock db.");
+ break;
+ default:
+ if (ret == DB_REP_LOCKOUT)
+ break; // Fall through
+ else if (ret == ENOENT && !creating)
+ log("\nStock DB does not yet exist\n");
+ else {
+ DbException ex(ret);
+ throw ex;
+ }
+ } // switch
+
+ // (All retryable errors fall through to here.)
+ //
+ log("\nPlease retry the operation");
+ close_db(dbp);
+ return (false);
+}
+
+void RepQuoteExample::close_db(Db *&dbp)
+{
+ if (dbp) {
+ try {
+ dbp->close(0);
+ delete dbp;
+ dbp = 0;
+ } catch (...) {
+ delete dbp;
+ dbp = 0;
+ throw;
+ }
+ }
+
+}
+
+RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(NULL) {
+ app_data.app_finished = 0;
+ app_data.in_client_sync = 0;
+ app_data.is_master = 0; // assume I start out as client
+ cur_env = new DbEnv(DB_CXX_NO_EXCEPTIONS);
+ strmap = NULL;
+ dbp = NULL;
+}
+
+void RepQuoteExample::init(RepConfigInfo *config) {
+ app_config = config;
+
+ cur_env->set_app_private(&app_data);
+ cur_env->set_errfile(stderr);
+ cur_env->set_errpfx(progname);
+ cur_env->set_event_notify(event_callback);
+
+ // Configure bulk transfer to send groups of records to clients
+ // in a single network transfer. This is useful for master sites
+ // and clients participating in client-to-client synchronization.
+ //
+ if (app_config->bulk)
+ cur_env->rep_set_config(DB_REP_CONF_BULK, 1);
+
+
+ // Set the total number of sites in the replication group.
+ // This is used by repmgr internal election processing.
+ //
+ if (app_config->totalsites > 0)
+ cur_env->rep_set_nsites(app_config->totalsites);
+
+ // Turn on debugging and informational output if requested.
+ if (app_config->verbose)
+ cur_env->set_verbose(DB_VERB_REPLICATION, 1);
+
+ // Set replication group election priority for this environment.
+ // An election first selects the site with the most recent log
+ // records as the new master. If multiple sites have the most
+ // recent log records, the site with the highest priority value
+ // is selected as master.
+ //
+ cur_env->rep_set_priority(app_config->priority);
+
+ // Set the policy that determines how master and client sites
+ // handle acknowledgement of replication messages needed for
+ // permanent records. The default policy of "quorum" requires only
+ // a quorum of electable peers sufficient to ensure a permanent
+ // record remains durable if an election is held. The "all" option
+ // requires all clients to acknowledge a permanent replication
+ // message instead.
+ //
+ cur_env->repmgr_set_ack_policy(app_config->ack_policy);
+
+ // Set the threshold for the minimum and maximum time the client
+ // waits before requesting retransmission of a missing message.
+ // Base these values on the performance and load characteristics
+ // of the master and client host platforms as well as the round
+ // trip message time.
+ //
+ cur_env->rep_set_request(20000, 500000);
+
+ // Configure deadlock detection to ensure that any deadlocks
+ // are broken by having one of the conflicting lock requests
+ // rejected. DB_LOCK_DEFAULT uses the lock policy specified
+ // at environment creation time or DB_LOCK_RANDOM if none was
+ // specified.
+ //
+ cur_env->set_lk_detect(DB_LOCK_DEFAULT);
+
+ // The following base replication features may also be useful to your
+ // application. See Berkeley DB documentation for more details.
+ // - Master leases: Provide stricter consistency for data reads
+ // on a master site.
+ // - Timeouts: Customize the amount of time Berkeley DB waits
+ // for such things as an election to be concluded or a master
+ // lease to be granted.
+ // - Delayed client synchronization: Manage the master site's
+ // resources by spreading out resource-intensive client
+ // synchronizations.
+ // - Blocked client operations: Return immediately with an error
+ // instead of waiting indefinitely if a client operation is
+ // blocked by an ongoing client synchronization.
+
+ cur_env->repmgr_set_local_site(app_config->this_host.host,
+ app_config->this_host.port, 0);
+
+ for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL;
+ cur = cur->next) {
+ cur_env->repmgr_add_remote_site(cur->host, cur->port,
+ NULL, cur->peer ? DB_REPMGR_PEER : 0);
+ }
+
+ // Configure heartbeat timeouts so that repmgr monitors the
+ // health of the TCP connection. Master sites broadcast a heartbeat
+ // at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
+ // Client sites wait for message activity the length of the
+ // DB_REP_HEARTBEAT_MONITOR timeout before concluding that the
+ // connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR
+ // timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
+ //
+ cur_env->rep_set_timeout(DB_REP_HEARTBEAT_SEND, 5000000);
+ cur_env->rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 10000000);
+
+ // The following repmgr features may also be useful to your
+ // application. See Berkeley DB documentation for more details.
+ // - Two-site strict majority rule - In a two-site replication
+ // group, require both sites to be available to elect a new
+ // master.
+ // - Timeouts - Customize the amount of time repmgr waits
+ // for such things as waiting for acknowledgements or attempting
+ // to reconnect to other sites.
+ // - Site list - return a list of sites currently known to repmgr.
+
+ // We can now open our environment, although we're not ready to
+ // begin replicating. However, we want to have a dbenv around
+ // so that we can send it into any of our message handlers.
+ cur_env->set_cachesize(0, CACHESIZE, 0);
+ cur_env->set_flags(DB_TXN_NOSYNC, 1);
+
+ cur_env->open(app_config->home, DB_CREATE | DB_RECOVER |
+ DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
+ DB_INIT_MPOOL | DB_INIT_TXN, 0);
+
+ // Start checkpoint and log archive support threads.
+ (void)thread_create(&ckp_thr, NULL, checkpoint_thread, cur_env);
+ (void)thread_create(&lga_thr, NULL, log_archive_thread, cur_env);
+
+ dbstl::register_db_env(cur_env);
+ cur_env->repmgr_start(3, app_config->start_policy);
+}
+
+int RepQuoteExample::terminate() {
+ try {
+ // Wait for checkpoint and log archive threads to finish.
+ // Windows does not allow NULL pointer for exit code variable.
+ thread_exit_status_t exstat;
+
+ (void)thread_join(lga_thr, &exstat);
+ (void)thread_join(ckp_thr, &exstat);
+
+ // We have used the DB_TXN_NOSYNC environment flag for
+ // improved performance without the usual sacrifice of
+ // transactional durability, as discussed in the
+ // "Transactional guarantees" page of the Reference
+ // Guide: if one replication site crashes, we can
+ // expect the data to exist at another site. However,
+ // in case we shut down all sites gracefully, we push
+ // out the end of the log here so that the most
+ // recent transactions don't mysteriously disappear.
+ cur_env->log_flush(NULL);
+ } catch (DbException dbe) {
+ cout << "\nerror closing environment: " << dbe.what() << endl;
+ }
+ return 0;
+}
+
+void RepQuoteExample::prompt() {
+ cout << "QUOTESERVER";
+ if (!app_data.is_master)
+ cout << "(read-only)";
+ cout << "> " << flush;
+}
+
+void log(const char *msg) {
+ cerr << msg << endl;
+}
+
+// Simple command-line user interface:
+// - enter "<stock symbol> <price>" to insert or update a record in the
+// database;
+// - just press Return (i.e., blank input line) to print out the contents of
+// the database;
+// - enter "quit" or "exit" to quit.
+//
+void RepQuoteExample::doloop() {
+ string input;
+
+ while (prompt(), getline(cin, input)) {
+ istringstream is(input);
+ string token1, token2;
+
+ // Read 0, 1 or 2 tokens from the input.
+ //
+ int count = 0;
+ if (is >> token1) {
+ count++;
+ if (is >> token2)
+ count++;
+ }
+
+ if (count == 1) {
+ if (token1 == "exit" || token1 == "quit") {
+ app_data.app_finished = 1;
+ break;
+ } else {
+ log("\nFormat: <stock> <price>\n");
+ continue;
+ }
+ }
+
+ // Here we know count is either 0 or 2, so we're about to try a
+ // DB operation.
+ //
+ // Open database with DB_CREATE only if this is a master
+ // database. A client database uses polling to attempt
+ // to open the database without DB_CREATE until it is
+ // successful.
+ //
+ // This DB_CREATE polling logic can be simplified under
+ // some circumstances. For example, if the application can
+ // be sure a database is already there, it would never need
+ // to open it with DB_CREATE.
+ //
+ if (!open_db(app_data.is_master))
+ continue;
+
+ try {
+ if (count == 0)
+ if (app_data.in_client_sync)
+ log(
+ "Cannot read data during client initialization - please try again.");
+ else
+ print_stocks();
+ else if (!app_data.is_master)
+ log("\nCan't update at client\n");
+ else {
+ char *symbol = new char[token1.length() + 1];
+ strcpy(symbol, token1.c_str());
+ char *price = new char[token2.length() + 1];
+ strcpy(price, token2.c_str());
+ begin_txn(0, cur_env);
+ strmap->insert(make_pair(symbol, price));
+ commit_txn(cur_env);
+ delete symbol;
+ delete price;
+ }
+ } catch (DbDeadlockException e) {
+ log("\nplease retry the operation\n");
+ close_db();
+ } catch (DbRepHandleDeadException e) {
+ log("\nplease retry the operation\n");
+ close_db();
+ } catch (DbException e) {
+ if (e.get_errno() == DB_REP_LOCKOUT) {
+ log("\nplease retry the operation\n");
+ close_db();
+ } else
+ throw;
+ }
+
+ }
+
+ close_db();
+}
+
+void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
+{
+ APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
+
+ info = NULL; /* Currently unused. */
+
+ switch (which) {
+ case DB_EVENT_REP_MASTER:
+ app->in_client_sync = 0;
+ app->is_master = 1;
+ break;
+
+ case DB_EVENT_REP_CLIENT:
+ app->is_master = 0;
+ app->in_client_sync = 1;
+ break;
+
+ case DB_EVENT_REP_STARTUPDONE:
+ app->in_client_sync = 0;
+ break;
+ case DB_EVENT_REP_NEWMASTER:
+ app->in_client_sync = 1;
+ break;
+
+ case DB_EVENT_REP_PERM_FAILED:
+ // Did not get enough acks to guarantee transaction
+ // durability based on the configured ack policy. This
+ // transaction will be flushed to the master site's
+ // local disk storage for durability.
+ //
+ log(
+ "Insufficient acknowledgements to guarantee transaction durability.");
+ break;
+
+ default:
+ dbenv->errx("\nignoring event %d", which);
+ }
+}
+
+void RepQuoteExample::print_stocks() {
+#define MAXKEYSIZE 10
+#define MAXDATASIZE 20
+
+ cout << "\tSymbol\tPrice" << endl
+ << "\t======\t=====" << endl;
+ str_map_t::iterator itr;
+ if (strmap == NULL)
+ strmap = new str_map_t(dbp, cur_env);
+ begin_txn(0, cur_env);
+ for (itr = strmap->begin(); itr != strmap->end(); ++itr)
+ cout<<"\t"<<itr->first<<"\t"<<itr->second<<endl;
+ commit_txn(cur_env);
+ cout << endl << flush;
+}
+
+static void usage() {
+ cerr << "usage: " << progname << endl
+ << "[-h home][-o host:port][-m host:port][-f host:port]"
+ << "[-n nsites][-p priority]" << endl;
+
+ cerr << "\t -h home (required; h stands for home directory)" << endl
+ << "\t -l host:port (required; l stands for local)" << endl
+ << "\t -C or -M (optional; start up as client or master)" << endl
+ << "\t -r host:port (optional; r stands for remote; any "
+ << "number of these" << endl
+ << "\t may be specified)" << endl
+ << "\t -R host:port (optional; R stands for remote peer; only "
+ << "one of" << endl
+ << "\t these may be specified)" << endl
+ << "\t -a all|quorum (optional; a stands for ack policy)" << endl
+ << "\t -b (optional; b stands for bulk)" << endl
+ << "\t -n nsites (optional; number of sites in replication "
+ << "group; defaults " << endl
+ << "\t to 0 to try to dynamically compute nsites)" << endl
+ << "\t -p priority (optional; defaults to 100)" << endl
+ << "\t -v (optional; v stands for verbose)" << endl;
+
+ exit(EXIT_FAILURE);
+}
+
+int main(int argc, char **argv) {
+ RepConfigInfo config;
+ char ch, *portstr, *tmphost;
+ int tmpport;
+ bool tmppeer;
+
+ // Extract the command line parameters
+ while ((ch = getopt(argc, argv, "a:bCh:l:Mn:p:R:r:v")) != EOF) {
+ tmppeer = false;
+ switch (ch) {
+ case 'a':
+ if (strncmp(optarg, "all", 3) == 0)
+ config.ack_policy = DB_REPMGR_ACKS_ALL;
+ else if (strncmp(optarg, "quorum", 6) != 0)
+ usage();
+ break;
+ case 'b':
+ config.bulk = true;
+ break;
+ case 'C':
+ config.start_policy = DB_REP_CLIENT;
+ break;
+ case 'h':
+ config.home = optarg;
+ break;
+ case 'l':
+ config.this_host.host = strtok(optarg, ":");
+ if ((portstr = strtok(NULL, ":")) == NULL) {
+ cerr << "\nBad host specification." << endl;
+ usage();
+ }
+ config.this_host.port = (unsigned short)atoi(portstr);
+ config.got_listen_address = true;
+ break;
+ case 'M':
+ config.start_policy = DB_REP_MASTER;
+ break;
+ case 'n':
+ config.totalsites = atoi(optarg);
+ break;
+ case 'p':
+ config.priority = atoi(optarg);
+ break;
+ case 'R':
+ tmppeer = true; // FALLTHROUGH
+ case 'r':
+ tmphost = strtok(optarg, ":");
+ if ((portstr = strtok(NULL, ":")) == NULL) {
+ cerr << "Bad host specification." << endl;
+ usage();
+ }
+ tmpport = (unsigned short)atoi(portstr);
+
+ config.addOtherHost(tmphost, tmpport, tmppeer);
+
+ break;
+ case 'v':
+ config.verbose = true;
+ break;
+ case '?':
+ default:
+ usage();
+ }
+ }
+
+ // Error check command line.
+ if ((!config.got_listen_address) || config.home == NULL)
+ usage();
+
+ RepQuoteExample runner;
+ try {
+ runner.init(&config);
+ runner.doloop();
+ } catch (DbException dbe) {
+ cerr << "\nCaught an exception during initialization or"
+ << " processing: " << dbe.what() << endl;
+ }
+ runner.terminate();
+ return 0;
+}
+
+// This is a very simple thread that performs checkpoints at a fixed
+// time interval. For a master site, the time interval is one minute
+// plus the duration of the checkpoint_delay timeout (30 seconds by
+// default.) For a client site, the time interval is one minute.
+//
+void *checkpoint_thread(void *args)
+{
+ DbEnv *env;
+ APP_DATA *app;
+ int i, ret;
+
+ env = (DbEnv *)args;
+ app = (APP_DATA *)env->get_app_private();
+
+ for (;;) {
+ // Wait for one minute, polling once per second to see if
+ // application has finished. When application has finished,
+ // terminate this thread.
+ //
+ for (i = 0; i < 60; i++) {
+ sleep(1);
+ if (app->app_finished == 1)
+ return ((void *)EXIT_SUCCESS);
+ }
+
+ // Perform a checkpoint.
+ if ((ret = env->txn_checkpoint(0, 0, 0)) != 0) {
+ env->err(ret, "Could not perform checkpoint.\n");
+ return ((void *)EXIT_FAILURE);
+ }
+ }
+}
+
+// This is a simple log archive thread. Once per minute, it removes all but
+// the most recent 3 logs that are safe to remove according to a call to
+// DBENV->log_archive().
+//
+// Log cleanup is needed to conserve disk space, but aggressive log cleanup
+// can cause more frequent client initializations if a client lags too far
+// behind the current master. This can happen in the event of a slow client,
+// a network partition, or a new master that has not kept as many logs as the
+// previous master.
+//
+// The approach in this routine balances the need to mitigate against a
+// lagging client by keeping a few more of the most recent unneeded logs
+// with the need to conserve disk space by regularly cleaning up log files.
+// Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE
+// flag) is not recommended for replication due to the risk of frequent
+// client initializations.
+//
+void *log_archive_thread(void *args)
+{
+ DbEnv *env;
+ APP_DATA *app;
+ char **begin, **list;
+ int i, listlen, logs_to_keep, minlog, ret;
+
+ env = (DbEnv *)args;
+ app = (APP_DATA *)env->get_app_private();
+ logs_to_keep = 3;
+
+ for (;;) {
+ // Wait for one minute, polling once per second to see if
+ // application has finished. When application has finished,
+ // terminate this thread.
+ //
+ for (i = 0; i < 60; i++) {
+ sleep(1);
+ if (app->app_finished == 1)
+ return ((void *)EXIT_SUCCESS);
+ }
+
+ // Get the list of unneeded log files.
+ if ((ret = env->log_archive(&list, DB_ARCH_ABS)) != 0) {
+ env->err(ret, "Could not get log archive list.");
+ return ((void *)EXIT_FAILURE);
+ }
+ if (list != NULL) {
+ listlen = 0;
+ // Get the number of logs in the list.
+ for (begin = list; *begin != NULL; begin++, listlen++);
+ // Remove all but the logs_to_keep most recent
+ // unneeded log files.
+ //
+ minlog = listlen - logs_to_keep;
+ for (begin = list, i= 0; i < minlog; list++, i++) {
+ if ((ret = unlink(*list)) != 0) {
+ env->err(ret,
+ "logclean: remove %s", *list);
+ env->errx(
+ "logclean: Error remove %s", *list);
+ free(begin);
+ return ((void *)EXIT_FAILURE);
+ }
+ }
+ free(begin);
+ }
+ }
+}
+