diff options
Diffstat (limited to 'examples_stl')
-rw-r--r-- | examples_stl/README | 41 | ||||
-rw-r--r-- | examples_stl/StlAccessExample.cpp | 143 | ||||
-rw-r--r-- | examples_stl/StlTpcbExample.cpp | 626 | ||||
-rw-r--r-- | examples_stl/StlTransactionGuideExample.cpp | 372 | ||||
-rw-r--r-- | examples_stl/repquote/README | 17 | ||||
-rw-r--r-- | examples_stl/repquote/StlRepConfigInfo.cpp | 56 | ||||
-rw-r--r-- | examples_stl/repquote/StlRepConfigInfo.h | 36 | ||||
-rw-r--r-- | examples_stl/repquote/StlRepQuoteExample.cpp | 712 |
8 files changed, 2003 insertions, 0 deletions
diff --git a/examples_stl/README b/examples_stl/README new file mode 100644 index 0000000..43f568b --- /dev/null +++ b/examples_stl/README @@ -0,0 +1,41 @@ +# $Id$ + +StlAccessExample.cpp Simple Database Access. + + Exstl_access uses STL simple features based on the DB access methods. + + Build: make exstl_access + + +repquote/ Replication. + + Exstl_repquote creates a toy stock quote server + with DB's single-master, multiple-client replication + with communication over TCP, via STL API. See repquote/README. + + Build: make exstl_repquote + + +StlTransactionGuideExample.cpp Multithreaded DB Access. + + StlTxnGuide runs multiple threads to access databases via STL API. + + Build: make StlTxnGuide + + +StlTpcbExample.cpp TPC/B. + + Exstl_tpcb sets up a framework in which to run a TPC/B test + via the STL API. + + Database initialization (the -i flag) and running the + benchmark (-n flag) must take place separately (i.e., + first create the database, then run 1 or more copies of + the benchmark). Furthermore, when running more than one + TPCB process, it is necessary to run the deadlock detector + (db_deadlock), since it is possible for concurrent tpcb + processes to deadlock. For performance measurement, it + will also be beneficial to run the db_checkpoint process + as well. + + Build: make exstl_tpcb diff --git a/examples_stl/StlAccessExample.cpp b/examples_stl/StlAccessExample.cpp new file mode 100644 index 0000000..33a9971 --- /dev/null +++ b/examples_stl/StlAccessExample.cpp @@ -0,0 +1,143 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2008-2009 Oracle. All rights reserved. + * + * $Id$ + */ +#include <sys/types.h> + +#include <iostream> +#include <iomanip> +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <utility> + +#include "dbstl_map.h" +#include "dbstl_vector.h" + +using namespace std; +using namespace dbstl; + +#ifdef _WIN32 +extern "C" { + extern int getopt(int, char * const *, const char *); + extern int optind; +} +#else +#include <unistd.h> +#endif + +#include <db_cxx.h> + +using std::cin; +using std::cout; +using std::cerr; + +class AccessExample +{ +public: + AccessExample(); + void run(); + +private: + // no need for copy and assignment + AccessExample(const AccessExample &); + void operator = (const AccessExample &); +}; + +int +usage() +{ + (void)fprintf(stderr, "usage: AccessExample"); + return (EXIT_FAILURE); +} + +int +main(int argc, char *argv[]) +{ + // Use a try block just to report any errors. + // An alternate approach to using exceptions is to + // use error models (see DbEnv::set_error_model()) so + // that error codes are returned for all Berkeley DB methods. + // + try { + AccessExample app; + app.run(); + return (EXIT_SUCCESS); + } + catch (DbException &dbe) { + cerr << "AccessExample: " << dbe.what() << "\n"; + return (EXIT_FAILURE); + } +} + +AccessExample::AccessExample() +{ +} + +void AccessExample::run() +{ + typedef db_map<char *, char *, ElementHolder<char *> > strmap_t; + // Create a map container with inmemory anonymous database. + strmap_t dbmap; + + // Insert records into dbmap, where the key is the user + // input and the data is the user input in reverse order. + // + char buf[1024], rbuf[1024]; + char *p, *t; + u_int32_t len; + + for (;;) { + // Acquire user input string as key. + cout << "input> "; + cout.flush(); + + cin.getline(buf, sizeof(buf)); + if (cin.eof()) + break; + + if ((len = (u_int32_t)strlen(buf)) <= 0) + continue; + if (strcmp(buf, "quit") == 0) + break; + + // Reverse input string as data. + for (t = rbuf, p = buf + (len - 1); p >= buf;) + *t++ = *p--; + *t++ = '\0'; + + + // Insert key/data pair. + try { + dbmap.insert(make_pair(buf, rbuf)); + } catch (DbException ex) { + if (ex.get_errno() == DB_KEYEXIST) { + cout << "Key " << buf << " already exists.\n"; + } else + throw; + } catch (...) { + throw; + } + } + cout << "\n"; + + // We put a try block around this section of code + // to ensure that our database is properly closed + // in the event of an error. + // + try { + strmap_t::iterator itr; + for (itr = dbmap.begin(); + itr != dbmap.end(); ++itr) + cout<<itr->first<<" : "<<itr->second<<endl; + } + catch (DbException ex) { + cerr<<"AccessExample "<<ex.what()<<endl; + } + + dbstl_exit(); + +} diff --git a/examples_stl/StlTpcbExample.cpp b/examples_stl/StlTpcbExample.cpp new file mode 100644 index 0000000..d2a7810 --- /dev/null +++ b/examples_stl/StlTpcbExample.cpp @@ -0,0 +1,626 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1997-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include <sys/types.h> + +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> + +#include <iostream> + +#include "dbstl_vector.h" +#include "dbstl_map.h" + +using std::cout; +using std::cerr; + +typedef enum { ACCOUNT, BRANCH, TELLER } FTYPE; + +static int invarg(int, char *); +u_int32_t random_id(FTYPE, u_int32_t, u_int32_t, u_int32_t); +u_int32_t random_int(u_int32_t, u_int32_t); +static int usage(void); + +int verbose; +const char *progname = "StlTpcbExample"; // Program name. + +// Forward declared data classes +class Defrec; +class Histrec; + +typedef dbstl::db_map<u_int32_t, Defrec > DefrecMap; +typedef dbstl::db_vector<Histrec > HistrecVector; + +class StlTpcbExample : public DbEnv +{ +public: + void populate(int, int, int, int); + void run(int, int, int, int); + int txn(DefrecMap *, DefrecMap *, DefrecMap *, HistrecVector *, + int accounts, int branches, int tellers); + void populateHistory( + HistrecVector *, int, u_int32_t, u_int32_t, u_int32_t); + void populateTable( + DefrecMap *, u_int32_t, u_int32_t, int, const char *); + + // Note: the constructor creates a DbEnv(), which is + // not fully initialized until the DbEnv::open() method + // is called. + // + StlTpcbExample(const char *home, int cachesize, int flags); + +private: + static const char FileName[]; + + // no need for copy and assignment + StlTpcbExample(const StlTpcbExample &); + void operator = (const StlTpcbExample &); +}; + +// +// This program implements a basic TPC/B driver program. To create the +// TPC/B database, run with the -i (init) flag. The number of records +// with which to populate the account, history, branch, and teller tables +// is specified by the a, s, b, and t flags respectively. To run a TPC/B +// test, use the n flag to indicate a number of transactions to run (note +// that you can run many of these processes in parallel to simulate a +// multiuser test run). +// +#define TELLERS_PER_BRANCH 100 +#define ACCOUNTS_PER_TELLER 1000 +#define HISTORY_PER_BRANCH 2592000 + +/* + * The default configuration that adheres to TPCB scaling rules requires + * nearly 3 GB of space. To avoid requiring that much space for testing, + * we set the parameters much lower. If you want to run a valid 10 TPS + * configuration, define VALID_SCALING. + */ +#ifdef VALID_SCALING +#define ACCOUNTS 1000000 +#define BRANCHES 10 +#define TELLERS 100 +#define HISTORY 25920000 +#endif + +#ifdef TINY +#define ACCOUNTS 1000 +#define BRANCHES 10 +#define TELLERS 100 +#define HISTORY 10000 +#endif + +#if !defined(VALID_SCALING) && !defined(TINY) +#define ACCOUNTS 100000 +#define BRANCHES 10 +#define TELLERS 100 +#define HISTORY 259200 +#endif + +#define HISTORY_LEN 100 +#define RECLEN 100 +#define BEGID 1000000 + +class Defrec { +public: + u_int32_t id; + u_int32_t balance; + u_int8_t pad[RECLEN - sizeof(u_int32_t) - sizeof(u_int32_t)]; +}; + +class Histrec { +public: + u_int32_t aid; + u_int32_t bid; + u_int32_t tid; + u_int32_t amount; + u_int8_t pad[RECLEN - 4 * sizeof(u_int32_t)]; +}; + +int +main(int argc, char *argv[]) +{ + unsigned long seed; + int accounts, branches, tellers, history; + int iflag, mpool, ntxns, txn_no_sync; + const char *home; + char *endarg; + + home = "TESTDIR"; + accounts = branches = history = tellers = 0; + txn_no_sync = 0; + mpool = ntxns = 0; + verbose = 0; + iflag = 0; + seed = (unsigned long)time(NULL); + + for (int i = 1; i < argc; ++i) { + + if (strcmp(argv[i], "-a") == 0) { + // Number of account records + if ((accounts = atoi(argv[++i])) <= 0) + return (invarg('a', argv[i])); + } + else if (strcmp(argv[i], "-b") == 0) { + // Number of branch records + if ((branches = atoi(argv[++i])) <= 0) + return (invarg('b', argv[i])); + } + else if (strcmp(argv[i], "-c") == 0) { + // Cachesize in bytes + if ((mpool = atoi(argv[++i])) <= 0) + return (invarg('c', argv[i])); + } + else if (strcmp(argv[i], "-f") == 0) { + // Fast mode: no txn sync. + txn_no_sync = 1; + } + else if (strcmp(argv[i], "-h") == 0) { + // DB home. + home = argv[++i]; + } + else if (strcmp(argv[i], "-i") == 0) { + // Initialize the test. + iflag = 1; + } + else if (strcmp(argv[i], "-n") == 0) { + // Number of transactions + if ((ntxns = atoi(argv[++i])) <= 0) + return (invarg('n', argv[i])); + } + else if (strcmp(argv[i], "-S") == 0) { + // Random number seed. + seed = strtoul(argv[++i], &endarg, 0); + if (*endarg != '\0') + return (invarg('S', argv[i])); + } + else if (strcmp(argv[i], "-s") == 0) { + // Number of history records + if ((history = atoi(argv[++i])) <= 0) + return (invarg('s', argv[i])); + } + else if (strcmp(argv[i], "-t") == 0) { + // Number of teller records + if ((tellers = atoi(argv[++i])) <= 0) + return (invarg('t', argv[i])); + } + else if (strcmp(argv[i], "-v") == 0) { + // Verbose option. + verbose = 1; + } + else { + return (usage()); + } + } + + srand((unsigned int)seed); + + accounts = accounts == 0 ? ACCOUNTS : accounts; + branches = branches == 0 ? BRANCHES : branches; + tellers = tellers == 0 ? TELLERS : tellers; + history = history == 0 ? HISTORY : history; + + if (verbose) + cout << (long)accounts << " Accounts, " + << (long)branches << " Branches, " + << (long)tellers << " Tellers, " + << (long)history << " History\n"; + + try { + // Initialize the database environment. + // Must be done in within a try block, unless you + // change the error model in the environment options. + // + StlTpcbExample app( + home, mpool, txn_no_sync ? DB_TXN_NOSYNC : 0); + + if (iflag) { + if (ntxns != 0) + return (usage()); + app.populate(accounts, branches, history, tellers); + } + else { + if (ntxns == 0) + return (usage()); + app.run(ntxns, accounts, branches, tellers); + } + + dbstl::dbstl_exit(); + return (EXIT_SUCCESS); + } + catch (DbException &dbe) { + cerr << "StlTpcbExample: " << dbe.what() << "\n"; + return (EXIT_FAILURE); + } +} + +static int +invarg(int arg, char *str) +{ + cerr << "StlTpcbExample: invalid argument for -" + << (char)arg << ": " << str << "\n"; + return (EXIT_FAILURE); +} + +static int +usage() +{ + cerr << "usage: StlTpcbExample [-fiv] [-a accounts] [-b branches]\n" + << " [-c cachesize] [-h home] [-n transactions]\n" + << " [-S seed] [-s history] [-t tellers]\n"; + return (EXIT_FAILURE); +} + +StlTpcbExample::StlTpcbExample(const char *home, int cachesize, int flags) +: DbEnv(DB_CXX_NO_EXCEPTIONS) +{ + u_int32_t local_flags; + + set_error_stream(&cerr); + set_errpfx("StlTpcbExample"); + (void)set_lk_detect(DB_LOCK_DEFAULT); + (void)set_cachesize(0, cachesize == 0 ? + 4 * 1024 * 1024 : (u_int32_t)cachesize, 0); + + set_lk_max_lockers(1024 * 128); + set_lk_max_locks(1024 * 128); + set_lk_max_objects(1024 * 128); + if (flags & (DB_TXN_NOSYNC)) + set_flags(DB_TXN_NOSYNC, 1); + flags &= ~(DB_TXN_NOSYNC); + + local_flags = flags | DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | + DB_INIT_MPOOL | DB_INIT_TXN; + open(home, local_flags, 0); + dbstl::register_db_env(this); +} + +// +// Initialize the database to the specified number of accounts, branches, +// history records, and tellers. +// +void +StlTpcbExample::populate(int accounts, int branches, int history, int tellers) +{ + Db *dbp; + DefrecMap *accounts_map, *branches_map, *tellers_map; + HistrecVector *history_vector; + + int err, oflags; + u_int32_t balance, idnum; + u_int32_t end_anum, end_bnum, end_tnum; + u_int32_t start_anum, start_bnum, start_tnum; + + idnum = BEGID; + balance = 500000; + oflags = DB_CREATE; + + dbp = new Db(this, DB_CXX_NO_EXCEPTIONS); + dbp->set_h_nelem((unsigned int)accounts); + + if ((err = dbp->open(NULL, "account", NULL, + DB_HASH, oflags, 0644)) != 0) { + DbException except("Account file create failed", err); + throw except; + } + + dbstl::register_db(dbp); + accounts_map = new DefrecMap(dbp, this); + start_anum = idnum; + populateTable(accounts_map, idnum, balance, accounts, "account"); + idnum += accounts; + end_anum = idnum - 1; + // Automatically closes the underlying database. + delete accounts_map; + dbstl::close_db(dbp); + delete dbp; + if (verbose) + cout << "Populated accounts: " + << (long)start_anum << " - " << (long)end_anum << "\n"; + + dbp = new Db(this, DB_CXX_NO_EXCEPTIONS); + // + // Since the number of branches is very small, we want to use very + // small pages and only 1 key per page. This is the poor-man's way + // of getting key locking instead of page locking. + // + dbp->set_h_ffactor(1); + dbp->set_h_nelem((unsigned int)branches); + dbp->set_pagesize(512); + + if ((err = dbp->open(NULL, + "branch", NULL, DB_HASH, oflags, 0644)) != 0) { + DbException except("Branch file create failed", err); + throw except; + } + dbstl::register_db(dbp); + branches_map = new DefrecMap(dbp, this); + start_bnum = idnum; + populateTable(branches_map, idnum, balance, branches, "branch"); + idnum += branches; + end_bnum = idnum - 1; + delete branches_map; + dbstl::close_db(dbp); + delete dbp; + + if (verbose) + cout << "Populated branches: " + << (long)start_bnum << " - " << (long)end_bnum << "\n"; + + dbp = new Db(this, DB_CXX_NO_EXCEPTIONS); + // + // In the case of tellers, we also want small pages, but we'll let + // the fill factor dynamically adjust itself. + // + dbp->set_h_ffactor(0); + dbp->set_h_nelem((unsigned int)tellers); + dbp->set_pagesize(512); + + if ((err = dbp->open(NULL, + "teller", NULL, DB_HASH, oflags, 0644)) != 0) { + DbException except("Teller file create failed", err); + throw except; + } + + dbstl::register_db(dbp); + tellers_map = new DefrecMap(dbp, this); + start_tnum = idnum; + populateTable(tellers_map, idnum, balance, tellers, "teller"); + idnum += tellers; + end_tnum = idnum - 1; + delete tellers_map; + dbstl::close_db(dbp); + delete dbp; + if (verbose) + cout << "Populated tellers: " + << (long)start_tnum << " - " << (long)end_tnum << "\n"; + + dbp = new Db(this, DB_CXX_NO_EXCEPTIONS); + dbp->set_re_len(HISTORY_LEN); + if ((err = dbp->open(NULL, + "history", NULL, DB_RECNO, oflags, 0644)) != 0) { + DbException except("Create of history file failed", err); + throw except; + } + + dbstl::register_db(dbp); + history_vector = new HistrecVector(dbp, this); + populateHistory(history_vector, history, accounts, branches, tellers); + delete history_vector; + dbstl::close_db(dbp); + delete dbp; +} + +void +StlTpcbExample::populateTable(DefrecMap *drm, u_int32_t start_id, + u_int32_t balance, int nrecs, const char *msg) +{ + Defrec drec; + int i; + dbstl::pair<dbstl::db_map<u_int32_t, Defrec >::iterator, bool > ib; + + memset(&drec.pad[0], 1, sizeof(drec.pad)); + try { + for (i = 0; i < nrecs; i++) { + drec.id = start_id + (u_int32_t)i; + drec.balance = balance; + ib = drm->insert(dbstl::make_pair(drec.id, drec)); + if (ib.second == false) + throw "failed to insert record"; + } + } catch (...) { + throw; + } +} + +void +StlTpcbExample::populateHistory(HistrecVector *hrm, int nrecs, + u_int32_t accounts, u_int32_t branches, + u_int32_t tellers) +{ + Histrec hrec; + int i; + + memset(&hrec.pad[0], 1, sizeof(hrec.pad)); + hrec.amount = 10; + try { + for (i = 1; i <= nrecs; i++) { + hrec.aid = random_id( + ACCOUNT, accounts, branches, tellers); + hrec.bid = random_id( + BRANCH, accounts, branches, tellers); + hrec.tid = random_id( + TELLER, accounts, branches, tellers); + hrm->push_back(hrec); + } + } catch (...) { + throw; + } +} + +u_int32_t +random_int(u_int32_t lo, u_int32_t hi) +{ + u_int32_t ret; + int t; + + t = rand(); + ret = (u_int32_t)(((double)t / ((double)(RAND_MAX) + 1)) * + (hi - lo + 1)); + ret += lo; + return (ret); +} + +u_int32_t +random_id(FTYPE type, u_int32_t accounts, u_int32_t branches, u_int32_t tellers) +{ + u_int32_t min, max, num; + + max = min = BEGID; + num = accounts; + switch (type) { + case TELLER: + min += branches; + num = tellers; + // Fallthrough + case BRANCH: + if (type == BRANCH) + num = branches; + min += accounts; + // Fallthrough + case ACCOUNT: + max = min + num - 1; + } + return (random_int(min, max)); +} + +void +StlTpcbExample::run(int n, int accounts, int branches, int tellers) +{ + Db *adb, *bdb, *hdb, *tdb; + DefrecMap *accounts_map, *branches_map, *tellers_map; + HistrecVector *history_vector; + int failed, oflags, ret, txns; + time_t start_time, end_time; + + // + // Open the database files. + // + oflags = DB_AUTO_COMMIT; + + int err; + adb = new Db(this, DB_CXX_NO_EXCEPTIONS); + if ((err = adb->open(NULL, + "account", NULL, DB_UNKNOWN, oflags, 0)) != 0) { + DbException except("Open of account file failed", err); + throw except; + } + dbstl::register_db(adb); + accounts_map = new DefrecMap(adb); + + bdb = new Db(this, DB_CXX_NO_EXCEPTIONS); + if ((err = bdb->open(NULL, + "branch", NULL, DB_UNKNOWN, oflags, 0)) != 0) { + DbException except("Open of branch file failed", err); + throw except; + } + dbstl::register_db(bdb); + branches_map = new DefrecMap(bdb); + + tdb = new Db(this, DB_CXX_NO_EXCEPTIONS); + if ((err = tdb->open(NULL, + "teller", NULL, DB_UNKNOWN, oflags, 0)) != 0) { + DbException except("Open of teller file failed", err); + throw except; + } + dbstl::register_db(tdb); + tellers_map = new DefrecMap(tdb); + + hdb = new Db(this, DB_CXX_NO_EXCEPTIONS); + if ((err = hdb->open(NULL, + "history", NULL, DB_UNKNOWN, oflags, 0)) != 0) { + DbException except("Open of history file failed", err); + throw except; + } + dbstl::register_db(hdb); + history_vector = new HistrecVector(hdb); + + (void)time(&start_time); + for (txns = n, failed = 0; n-- > 0;) + if ((ret = txn(accounts_map, branches_map, tellers_map, + history_vector, accounts, branches, tellers)) != 0) + ++failed; + (void)time(&end_time); + if (end_time == start_time) + ++end_time; + // We use printf because it provides much simpler + // formatting than iostreams. + // + printf("%s: %d txns: %d failed, %d sec, %.2f TPS\n", progname, + txns, failed, (int)(end_time - start_time), + (txns - failed) / (double)(end_time - start_time)); + + delete accounts_map; + delete branches_map; + delete tellers_map; + delete history_vector; + dbstl::close_all_dbs(); +} + +// +// XXX Figure out the appropriate way to pick out IDs. +// +int +StlTpcbExample::txn(DefrecMap *accounts_map, DefrecMap *branches_map, + DefrecMap *tellers_map, HistrecVector *history_vector, + int accounts, int branches, int tellers) +{ + Histrec hrec; + DefrecMap::value_type_wrap::second_type recref, recref2, recref3; + int account, branch, teller; + + /* + * !!! + * This is sample code -- we could move a lot of this into the driver + * to make it faster. + */ + account = random_id(ACCOUNT, accounts, branches, tellers); + branch = random_id(BRANCH, accounts, branches, tellers); + teller = random_id(TELLER, accounts, branches, tellers); + + hrec.aid = account; + hrec.bid = branch; + hrec.tid = teller; + hrec.amount = 10; + + /* + * START PER-TRANSACTION TIMING. + * + * Technically, TPCB requires a limit on response time, you only get + * to count transactions that complete within 2 seconds. That's not + * an issue for this sample application -- regardless, here's where + * the transaction begins. + */ + try { + dbstl::begin_txn(0, this); + + /* Account record */ + recref = (*accounts_map)[account]; + recref.balance += 10; + recref._DB_STL_StoreElement(); + + /* Branch record */ + recref2 = (*branches_map)[branch]; + recref2.balance += 10; + recref2._DB_STL_StoreElement(); + + /* Teller record */ + recref3 = (*tellers_map)[teller]; + recref3.balance += 10; + recref3._DB_STL_StoreElement(); + + /* History record */ + history_vector->push_back(hrec); + dbstl::commit_txn(this); + /* END PER-TRANSACTION TIMING. */ + return (0); + + } catch (DbDeadlockException) { + dbstl::abort_txn(this); + if (verbose) + cout << "Transaction A=" << (long)account + << " B=" << (long)branch + << " T=" << (long)teller << " failed\n"; + return (DB_LOCK_DEADLOCK); + } catch(...) { + dbstl::abort_txn(this); + throw; + } +} diff --git a/examples_stl/StlTransactionGuideExample.cpp b/examples_stl/StlTransactionGuideExample.cpp new file mode 100644 index 0000000..bfc83cb --- /dev/null +++ b/examples_stl/StlTransactionGuideExample.cpp @@ -0,0 +1,372 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2008-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +// File txn_guide_stl.cpp +#include <iostream> +#include <db_cxx.h> + +#include "dbstl_map.h" + +#ifdef _WIN32 +#include <windows.h> +extern "C" { + extern int _tgetopt(int nargc, TCHAR* const* nargv, const TCHAR * ostr); + extern TCHAR *optarg; +} +#define PATHD '\\' + +typedef HANDLE thread_t; +#define thread_create(thrp, attr, func, arg) \ + (((*(thrp) = CreateThread(NULL, 0, \ + (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) +#define thread_join(thr, statusp) \ + ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ + ((statusp == NULL) ? 0 : \ + (GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1))) + +typedef HANDLE mutex_t; +#define mutex_init(m, attr) \ + (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) +#define mutex_lock(m) \ + ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) +#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) +#else +#include <pthread.h> +#include <unistd.h> +#define PATHD '/' + +typedef pthread_t thread_t; +#define thread_create(thrp, attr, func, arg) \ + pthread_create((thrp), (attr), (func), (arg)) +#define thread_join(thr, statusp) pthread_join((thr), (statusp)) + +typedef pthread_mutex_t mutex_t; +#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) +#define mutex_lock(m) pthread_mutex_lock(m) +#define mutex_unlock(m) pthread_mutex_unlock(m) +#endif + +// Run 5 writers threads at a time. +#define NUMWRITERS 5 +using namespace dbstl; +typedef db_multimap<const char *, int, ElementHolder<int> > strmap_t; +// Printing of thread_t is implementation-specific, so we +// create our own thread IDs for reporting purposes. +int global_thread_num; +mutex_t thread_num_lock; + +// Forward declarations +int countRecords(strmap_t *); +int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t); +int usage(void); +void *writerThread(void *); + +// Usage function +int +usage() +{ + std::cerr << " [-h <database_home_directory>] [-m (in memory use)]" + << std::endl; + return (EXIT_FAILURE); +} + +int +main(int argc, char *argv[]) +{ + // Initialize our handles + Db *dbp = NULL; + DbEnv *envp = NULL; + + thread_t writerThreads[NUMWRITERS]; + int i, inmem; + u_int32_t envFlags; + const char *dbHomeDir; + + inmem = 0; + // Application name + const char *progName = "TxnGuideStl"; + + // Database file name + const char *fileName = "mydb.db"; + + // Parse the command line arguments +#ifdef _WIN32 + dbHomeDir = ".\\TESTDIR"; +#else + dbHomeDir = "./TESTDIR"; +#endif + + // Env open flags + envFlags = + DB_CREATE | // Create the environment if it does not exist + DB_RECOVER | // Run normal recovery. + DB_INIT_LOCK | // Initialize the locking subsystem + DB_INIT_LOG | // Initialize the logging subsystem + DB_INIT_TXN | // Initialize the transactional subsystem. This + // also turns on logging. + DB_INIT_MPOOL | // Initialize the memory pool (in-memory cache) + DB_THREAD; // Cause the environment to be free-threaded + + try { + // Create and open the environment + envp = new DbEnv(DB_CXX_NO_EXCEPTIONS); + + // Indicate that we want db to internally perform deadlock + // detection. Also indicate that the transaction with + // the fewest number of write locks will receive the + // deadlock notification in the event of a deadlock. + envp->set_lk_detect(DB_LOCK_MINWRITE); + + if (inmem) { + envp->set_lg_bsize(64 * 1024 * 1024); + envp->open(NULL, envFlags, 0644); + fileName = NULL; + } else + envp->open(dbHomeDir, envFlags, 0644); + + // If we had utility threads (for running checkpoints or + // deadlock detection, for example) we would spawn those + // here. However, for a simple example such as this, + // that is not required. + + // Open the database + openDb(&dbp, progName, fileName, + envp, DB_DUP); + + // Call this function before any use of dbstl in a single thread + // if multiple threads are using dbstl. + dbstl::dbstl_startup(); + + // We created the dbp and envp handles not via dbstl::open_db/open_env + // functions, so we must register the handles in each thread using the + // container. + dbstl::register_db(dbp); + dbstl::register_db_env(envp); + + strmap_t *strmap = new strmap_t(dbp, envp); + // Initialize a mutex. Used to help provide thread ids. + (void)mutex_init(&thread_num_lock, NULL); + + // Start the writer threads. + for (i = 0; i < NUMWRITERS; i++) + (void)thread_create(&writerThreads[i], NULL, + writerThread, (void *)strmap); + + + // Join the writers + for (i = 0; i < NUMWRITERS; i++) + (void)thread_join(writerThreads[i], NULL); + + delete strmap; + + } catch(DbException &e) { + std::cerr << "Error opening database environment: " + << (inmem ? "NULL" : dbHomeDir) << std::endl; + std::cerr << e.what() << std::endl; + dbstl_exit(); + return (EXIT_FAILURE); + } + + // Environment and database will be automatically closed by dbstl. + + // Final status message and return. + + std::cout << "I'm all done." << std::endl; + + dbstl_exit(); + delete envp; + return (EXIT_SUCCESS); +} + +// A function that performs a series of writes to a +// Berkeley DB database. The information written +// to the database is largely nonsensical, but the +// mechanism of transactional commit/abort and +// deadlock detection is illustrated here. +void * +writerThread(void *args) +{ + int j, thread_num; + int max_retries = 1; // Max retry on a deadlock + const char *key_strings[] = {"key 1", "key 2", "key 3", "key 4", + "key 5", "key 6", "key 7", "key 8", + "key 9", "key 10"}; + + strmap_t *strmap = (strmap_t *)args; + DbEnv *envp = strmap->get_db_env_handle(); + + // We created the dbp and envp handles not via dbstl::open_db/open_env + // functions, so we must register the handles in each thread using the + // container. + dbstl::register_db(strmap->get_db_handle()); + dbstl::register_db_env(envp); + + // Get the thread number + (void)mutex_lock(&thread_num_lock); + global_thread_num++; + thread_num = global_thread_num; + (void)mutex_unlock(&thread_num_lock); + + // Initialize the random number generator + srand(thread_num); + + // Perform 50 transactions + for (int i = 0; i < 1; i++) { + DbTxn *txn; + int retry = 100; + int retry_count = 0, payload; + // while loop is used for deadlock retries + while (retry--) { + // try block used for deadlock detection and + // general db exception handling + try { + + // Begin our transaction. We group multiple writes in + // this thread under a single transaction so as to + // (1) show that you can atomically perform multiple + // writes at a time, and (2) to increase the chances + // of a deadlock occurring so that we can observe our + // deadlock detection at work. + + // Normally we would want to avoid the potential for + // deadlocks, so for this workload the correct thing + // would be to perform our puts with autocommit. But + // that would excessively simplify our example, so we + // do the "wrong" thing here instead. + txn = dbstl::begin_txn(0, envp); + + // Perform the database write for this transaction. + for (j = 0; j < 10; j++) { + payload = rand() + i; + strmap->insert(make_pair(key_strings[j], payload)); + } + + // countRecords runs a cursor over the entire database. + // We do this to illustrate issues of deadlocking + std::cout << thread_num << " : Found " + << countRecords(strmap) + << " records in the database." << std::endl; + + std::cout << thread_num << " : committing txn : " << i + << std::endl; + + // commit + try { + dbstl::commit_txn(envp); + } catch (DbException &e) { + std::cout << "Error on txn commit: " + << e.what() << std::endl; + } + } catch (DbDeadlockException &) { + // First thing that we MUST do is abort the transaction. + try { + dbstl::abort_txn(envp); + } catch (DbException ex1) { + std::cout<<ex1.what(); + } + + // Now we decide if we want to retry the operation. + // If we have retried less than max_retries, + // increment the retry count and goto retry. + if (retry_count < max_retries) { + std::cout << "############### Writer " << thread_num + << ": Got DB_LOCK_DEADLOCK.\n" + << "Retrying write operation." + << std::endl; + retry_count++; + + } else { + // Otherwise, just give up. + std::cerr << "Writer " << thread_num + << ": Got DeadLockException and out of " + << "retries. Giving up." << std::endl; + retry = 0; + } + } catch (DbException &e) { + std::cerr << "db_map<> storage failed" << std::endl; + std::cerr << e.what() << std::endl; + dbstl::abort_txn(envp); + retry = 0; + } catch (std::exception &ee) { + std::cerr << "Unknown exception: " << ee.what() << std::endl; + return (0); + } + } + } + return (0); +} + + +// This simply counts the number of records contained in the +// database and returns the result. +// +// Note that this method exists only for illustrative purposes. +// A more straight-forward way to count the number of records in +// a database is to use the db_map<>::size() method. +int +countRecords(strmap_t *strmap) +{ + + int count = 0; + strmap_t::iterator itr; + try { + // Set the flag used by Db::cursor. + for (itr = strmap->begin(); itr != strmap->end(); ++itr) + count++; + } catch (DbDeadlockException &de) { + std::cerr << "countRecords: got deadlock" << std::endl; + // itr's cursor will be automatically closed when it is destructed. + throw de; + } catch (DbException &e) { + std::cerr << "countRecords error:" << std::endl; + std::cerr << e.what() << std::endl; + } + + // itr's cursor will be automatically closed when it is destructed. + + return (count); +} + + +// Open a Berkeley DB database +int +openDb(Db **dbpp, const char *progname, const char *fileName, + DbEnv *envp, u_int32_t extraFlags) +{ + int ret; + u_int32_t openFlags; + + try { + Db *dbp = new Db(envp, DB_CXX_NO_EXCEPTIONS); + + // Point to the new'd Db. + *dbpp = dbp; + + if (extraFlags != 0) + ret = dbp->set_flags(extraFlags); + + // Now open the database. + openFlags = DB_CREATE | // Allow database creation + DB_READ_UNCOMMITTED | // Allow uncommitted reads + DB_AUTO_COMMIT; // Allow autocommit + + dbp->open(NULL, // Txn pointer + fileName, // File name + NULL, // Logical db name + DB_BTREE, // Database type (using btree) + openFlags, // Open flags + 0); // File mode. Using defaults + } catch (DbException &e) { + std::cerr << progname << ": openDb: db open failed:" << std::endl; + std::cerr << e.what() << std::endl; + return (EXIT_FAILURE); + } + + return (EXIT_SUCCESS); +} + diff --git a/examples_stl/repquote/README b/examples_stl/repquote/README new file mode 100644 index 0000000..86d0759 --- /dev/null +++ b/examples_stl/repquote/README @@ -0,0 +1,17 @@ +# $Id$ + +This is the directory for the replication example program. + +The example is a toy stock quote server. It uses the replication manager to +make use of DB replication, and uses STL API to acces the database. + + +StlRepQuoteExample.cpp + Contains code to implement the basic functions of the + application, and code necessary to configure the + application to use Replication Manager. + +StlRepConfigInfo.h +StlRepConfigInfo.cpp + Contains code to manage user configurations to this program, + including those to the DB replication manager. diff --git a/examples_stl/repquote/StlRepConfigInfo.cpp b/examples_stl/repquote/StlRepConfigInfo.cpp new file mode 100644 index 0000000..a80847a --- /dev/null +++ b/examples_stl/repquote/StlRepConfigInfo.cpp @@ -0,0 +1,56 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "StlRepConfigInfo.h" + +#include <cstdlib> + +RepConfigInfo::RepConfigInfo() +{ + start_policy = DB_REP_ELECTION; + home = "TESTDIR"; + got_listen_address = false; + totalsites = 0; + priority = 100; + verbose = false; + other_hosts = NULL; + ack_policy = DB_REPMGR_ACKS_QUORUM; + bulk = false; +} + +RepConfigInfo::~RepConfigInfo() +{ + // release any other_hosts structs. + if (other_hosts != NULL) { + REP_HOST_INFO *CurItem = other_hosts; + while (CurItem->next != NULL) + { + REP_HOST_INFO *TmpItem = CurItem->next; + free(CurItem); + CurItem = TmpItem; + } + free(CurItem); + } + other_hosts = NULL; +} + +void RepConfigInfo::addOtherHost(char* host, int port, bool peer) +{ + REP_HOST_INFO *newinfo; + newinfo = (REP_HOST_INFO*)malloc(sizeof(REP_HOST_INFO)); + newinfo->host = host; + newinfo->port = port; + newinfo->peer = peer; + if (other_hosts == NULL) { + other_hosts = newinfo; + newinfo->next = NULL; + } else { + newinfo->next = other_hosts; + other_hosts = newinfo; + } +} diff --git a/examples_stl/repquote/StlRepConfigInfo.h b/examples_stl/repquote/StlRepConfigInfo.h new file mode 100644 index 0000000..a4f36dc --- /dev/null +++ b/examples_stl/repquote/StlRepConfigInfo.h @@ -0,0 +1,36 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ +#include <db_cxx.h> + +// Chainable struct used to store host information. +typedef struct RepHostInfoObj{ + char* host; + int port; + bool peer; // only relevant for "other" hosts + RepHostInfoObj* next; // used for chaining multiple "other" hosts. +} REP_HOST_INFO; + +class RepConfigInfo { +public: + RepConfigInfo(); + virtual ~RepConfigInfo(); + + void addOtherHost(char* host, int port, bool peer); +public: + u_int32_t start_policy; + const char* home; + bool got_listen_address; + REP_HOST_INFO this_host; + int totalsites; + int priority; + bool verbose; + // used to store a set of optional other hosts. + REP_HOST_INFO *other_hosts; + int ack_policy; + bool bulk; +}; diff --git a/examples_stl/repquote/StlRepQuoteExample.cpp b/examples_stl/repquote/StlRepQuoteExample.cpp new file mode 100644 index 0000000..bf73d98 --- /dev/null +++ b/examples_stl/repquote/StlRepQuoteExample.cpp @@ -0,0 +1,712 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +/* + * In this application, we specify all communication via the command line. In + * a real application, we would expect that information about the other sites + * in the system would be maintained in some sort of configuration file. The + * critical part of this interface is that we assume at startup that we can + * find out + * 1) what our Berkeley DB home environment is, + * 2) what host/port we wish to listen on for connections; and + * 3) an optional list of other sites we should attempt to connect to. + * + * These pieces of information are expressed by the following flags. + * -h home (required; h stands for home directory) + * -l host:port (required; l stands for local) + * -C or -M (optional; start up as client or master) + * -r host:port (optional; r stands for remote; any number of these may be + * specified) + * -R host:port (optional; R stands for remote peer; only one of these may + * be specified) + * -a all|quorum (optional; a stands for ack policy) + * -b (optional; b stands for bulk) + * -n nsites (optional; number of sites in replication group; defaults to 0 + * to try to dynamically compute nsites) + * -p priority (optional; defaults to 100) + * -v (optional; v stands for verbose) + */ + +#include <iostream> +#include <string> +#include <sstream> + +#include <db_cxx.h> +#include "StlRepConfigInfo.h" +#include "dbstl_map.h" + +using std::cout; +using std::cin; +using std::cerr; +using std::endl; +using std::flush; +using std::istream; +using std::istringstream; +using std::string; +using std::getline; +using namespace dbstl; +#define CACHESIZE (10 * 1024 * 1024) +#define DATABASE "quote.db" + +const char *progname = "exstl_repquote"; + +#include <errno.h> +#ifdef _WIN32 +#define WIN32_LEAN_AND_MEAN +#include <windows.h> +#define snprintf _snprintf +#define sleep(s) Sleep(1000 * (s)) + +extern "C" { +extern int getopt(int, char * const *, const char *); +extern char *optarg; +extern int optind; +} + +typedef HANDLE thread_t; +typedef DWORD thread_exit_status_t; +#define thread_create(thrp, attr, func, arg) \ + (((*(thrp) = CreateThread(NULL, 0, \ + (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) +#define thread_join(thr, statusp) \ + ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ + GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1) +#else /* !_WIN32 */ +#include <pthread.h> + +typedef pthread_t thread_t; +typedef void* thread_exit_status_t; +#define thread_create(thrp, attr, func, arg) \ + pthread_create((thrp), (attr), (func), (arg)) +#define thread_join(thr, statusp) pthread_join((thr), (statusp)) +#endif + +// Struct used to store information in Db app_private field. +typedef struct { + bool app_finished; + bool in_client_sync; + bool is_master; +} APP_DATA; + +static void log(const char *); +void *checkpoint_thread (void *); +void *log_archive_thread (void *); + +class RepQuoteExample +{ +public: + typedef db_map<char *, char *, ElementHolder<char *> > str_map_t; + RepQuoteExample(); + void init(RepConfigInfo* config); + void doloop(); + int terminate(); + + static void event_callback(DbEnv * dbenv, u_int32_t which, void *info); + +private: + // disable copy constructor. + RepQuoteExample(const RepQuoteExample &); + void operator = (const RepQuoteExample &); + + // internal data members. + APP_DATA app_data; + RepConfigInfo *app_config; + DbEnv *cur_env; + Db *dbp; + str_map_t *strmap; + thread_t ckp_thr; + thread_t lga_thr; + + // private methods. + void print_stocks(); + void prompt(); + bool open_db(bool creating); + void close_db(){ + delete strmap; + strmap = NULL; + dbstl::close_db(dbp); + dbp = NULL; + } + static void close_db(Db *&);// Close an unregistered Db handle. +}; + +bool RepQuoteExample::open_db(bool creating) +{ + int ret; + + if (dbp) + return true; + + dbp = new Db(cur_env, DB_CXX_NO_EXCEPTIONS); + + u_int32_t flags = DB_AUTO_COMMIT | DB_THREAD; + if (creating) + flags |= DB_CREATE; + + ret = dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0); + switch (ret) { + case 0: + register_db(dbp); + if (strmap) + delete strmap; + strmap = new str_map_t(dbp, cur_env); + return (true); + case DB_LOCK_DEADLOCK: // Fall through + case DB_REP_HANDLE_DEAD: + log("\nFailed to open stock db."); + break; + default: + if (ret == DB_REP_LOCKOUT) + break; // Fall through + else if (ret == ENOENT && !creating) + log("\nStock DB does not yet exist\n"); + else { + DbException ex(ret); + throw ex; + } + } // switch + + // (All retryable errors fall through to here.) + // + log("\nPlease retry the operation"); + close_db(dbp); + return (false); +} + +void RepQuoteExample::close_db(Db *&dbp) +{ + if (dbp) { + try { + dbp->close(0); + delete dbp; + dbp = 0; + } catch (...) { + delete dbp; + dbp = 0; + throw; + } + } + +} + +RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(NULL) { + app_data.app_finished = 0; + app_data.in_client_sync = 0; + app_data.is_master = 0; // assume I start out as client + cur_env = new DbEnv(DB_CXX_NO_EXCEPTIONS); + strmap = NULL; + dbp = NULL; +} + +void RepQuoteExample::init(RepConfigInfo *config) { + app_config = config; + + cur_env->set_app_private(&app_data); + cur_env->set_errfile(stderr); + cur_env->set_errpfx(progname); + cur_env->set_event_notify(event_callback); + + // Configure bulk transfer to send groups of records to clients + // in a single network transfer. This is useful for master sites + // and clients participating in client-to-client synchronization. + // + if (app_config->bulk) + cur_env->rep_set_config(DB_REP_CONF_BULK, 1); + + + // Set the total number of sites in the replication group. + // This is used by repmgr internal election processing. + // + if (app_config->totalsites > 0) + cur_env->rep_set_nsites(app_config->totalsites); + + // Turn on debugging and informational output if requested. + if (app_config->verbose) + cur_env->set_verbose(DB_VERB_REPLICATION, 1); + + // Set replication group election priority for this environment. + // An election first selects the site with the most recent log + // records as the new master. If multiple sites have the most + // recent log records, the site with the highest priority value + // is selected as master. + // + cur_env->rep_set_priority(app_config->priority); + + // Set the policy that determines how master and client sites + // handle acknowledgement of replication messages needed for + // permanent records. The default policy of "quorum" requires only + // a quorum of electable peers sufficient to ensure a permanent + // record remains durable if an election is held. The "all" option + // requires all clients to acknowledge a permanent replication + // message instead. + // + cur_env->repmgr_set_ack_policy(app_config->ack_policy); + + // Set the threshold for the minimum and maximum time the client + // waits before requesting retransmission of a missing message. + // Base these values on the performance and load characteristics + // of the master and client host platforms as well as the round + // trip message time. + // + cur_env->rep_set_request(20000, 500000); + + // Configure deadlock detection to ensure that any deadlocks + // are broken by having one of the conflicting lock requests + // rejected. DB_LOCK_DEFAULT uses the lock policy specified + // at environment creation time or DB_LOCK_RANDOM if none was + // specified. + // + cur_env->set_lk_detect(DB_LOCK_DEFAULT); + + // The following base replication features may also be useful to your + // application. See Berkeley DB documentation for more details. + // - Master leases: Provide stricter consistency for data reads + // on a master site. + // - Timeouts: Customize the amount of time Berkeley DB waits + // for such things as an election to be concluded or a master + // lease to be granted. + // - Delayed client synchronization: Manage the master site's + // resources by spreading out resource-intensive client + // synchronizations. + // - Blocked client operations: Return immediately with an error + // instead of waiting indefinitely if a client operation is + // blocked by an ongoing client synchronization. + + cur_env->repmgr_set_local_site(app_config->this_host.host, + app_config->this_host.port, 0); + + for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL; + cur = cur->next) { + cur_env->repmgr_add_remote_site(cur->host, cur->port, + NULL, cur->peer ? DB_REPMGR_PEER : 0); + } + + // Configure heartbeat timeouts so that repmgr monitors the + // health of the TCP connection. Master sites broadcast a heartbeat + // at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout. + // Client sites wait for message activity the length of the + // DB_REP_HEARTBEAT_MONITOR timeout before concluding that the + // connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR + // timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout. + // + cur_env->rep_set_timeout(DB_REP_HEARTBEAT_SEND, 5000000); + cur_env->rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 10000000); + + // The following repmgr features may also be useful to your + // application. See Berkeley DB documentation for more details. + // - Two-site strict majority rule - In a two-site replication + // group, require both sites to be available to elect a new + // master. + // - Timeouts - Customize the amount of time repmgr waits + // for such things as waiting for acknowledgements or attempting + // to reconnect to other sites. + // - Site list - return a list of sites currently known to repmgr. + + // We can now open our environment, although we're not ready to + // begin replicating. However, we want to have a dbenv around + // so that we can send it into any of our message handlers. + cur_env->set_cachesize(0, CACHESIZE, 0); + cur_env->set_flags(DB_TXN_NOSYNC, 1); + + cur_env->open(app_config->home, DB_CREATE | DB_RECOVER | + DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG | + DB_INIT_MPOOL | DB_INIT_TXN, 0); + + // Start checkpoint and log archive support threads. + (void)thread_create(&ckp_thr, NULL, checkpoint_thread, cur_env); + (void)thread_create(&lga_thr, NULL, log_archive_thread, cur_env); + + dbstl::register_db_env(cur_env); + cur_env->repmgr_start(3, app_config->start_policy); +} + +int RepQuoteExample::terminate() { + try { + // Wait for checkpoint and log archive threads to finish. + // Windows does not allow NULL pointer for exit code variable. + thread_exit_status_t exstat; + + (void)thread_join(lga_thr, &exstat); + (void)thread_join(ckp_thr, &exstat); + + // We have used the DB_TXN_NOSYNC environment flag for + // improved performance without the usual sacrifice of + // transactional durability, as discussed in the + // "Transactional guarantees" page of the Reference + // Guide: if one replication site crashes, we can + // expect the data to exist at another site. However, + // in case we shut down all sites gracefully, we push + // out the end of the log here so that the most + // recent transactions don't mysteriously disappear. + cur_env->log_flush(NULL); + } catch (DbException dbe) { + cout << "\nerror closing environment: " << dbe.what() << endl; + } + return 0; +} + +void RepQuoteExample::prompt() { + cout << "QUOTESERVER"; + if (!app_data.is_master) + cout << "(read-only)"; + cout << "> " << flush; +} + +void log(const char *msg) { + cerr << msg << endl; +} + +// Simple command-line user interface: +// - enter "<stock symbol> <price>" to insert or update a record in the +// database; +// - just press Return (i.e., blank input line) to print out the contents of +// the database; +// - enter "quit" or "exit" to quit. +// +void RepQuoteExample::doloop() { + string input; + + while (prompt(), getline(cin, input)) { + istringstream is(input); + string token1, token2; + + // Read 0, 1 or 2 tokens from the input. + // + int count = 0; + if (is >> token1) { + count++; + if (is >> token2) + count++; + } + + if (count == 1) { + if (token1 == "exit" || token1 == "quit") { + app_data.app_finished = 1; + break; + } else { + log("\nFormat: <stock> <price>\n"); + continue; + } + } + + // Here we know count is either 0 or 2, so we're about to try a + // DB operation. + // + // Open database with DB_CREATE only if this is a master + // database. A client database uses polling to attempt + // to open the database without DB_CREATE until it is + // successful. + // + // This DB_CREATE polling logic can be simplified under + // some circumstances. For example, if the application can + // be sure a database is already there, it would never need + // to open it with DB_CREATE. + // + if (!open_db(app_data.is_master)) + continue; + + try { + if (count == 0) + if (app_data.in_client_sync) + log( + "Cannot read data during client initialization - please try again."); + else + print_stocks(); + else if (!app_data.is_master) + log("\nCan't update at client\n"); + else { + char *symbol = new char[token1.length() + 1]; + strcpy(symbol, token1.c_str()); + char *price = new char[token2.length() + 1]; + strcpy(price, token2.c_str()); + begin_txn(0, cur_env); + strmap->insert(make_pair(symbol, price)); + commit_txn(cur_env); + delete symbol; + delete price; + } + } catch (DbDeadlockException e) { + log("\nplease retry the operation\n"); + close_db(); + } catch (DbRepHandleDeadException e) { + log("\nplease retry the operation\n"); + close_db(); + } catch (DbException e) { + if (e.get_errno() == DB_REP_LOCKOUT) { + log("\nplease retry the operation\n"); + close_db(); + } else + throw; + } + + } + + close_db(); +} + +void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info) +{ + APP_DATA *app = (APP_DATA*)dbenv->get_app_private(); + + info = NULL; /* Currently unused. */ + + switch (which) { + case DB_EVENT_REP_MASTER: + app->in_client_sync = 0; + app->is_master = 1; + break; + + case DB_EVENT_REP_CLIENT: + app->is_master = 0; + app->in_client_sync = 1; + break; + + case DB_EVENT_REP_STARTUPDONE: + app->in_client_sync = 0; + break; + case DB_EVENT_REP_NEWMASTER: + app->in_client_sync = 1; + break; + + case DB_EVENT_REP_PERM_FAILED: + // Did not get enough acks to guarantee transaction + // durability based on the configured ack policy. This + // transaction will be flushed to the master site's + // local disk storage for durability. + // + log( + "Insufficient acknowledgements to guarantee transaction durability."); + break; + + default: + dbenv->errx("\nignoring event %d", which); + } +} + +void RepQuoteExample::print_stocks() { +#define MAXKEYSIZE 10 +#define MAXDATASIZE 20 + + cout << "\tSymbol\tPrice" << endl + << "\t======\t=====" << endl; + str_map_t::iterator itr; + if (strmap == NULL) + strmap = new str_map_t(dbp, cur_env); + begin_txn(0, cur_env); + for (itr = strmap->begin(); itr != strmap->end(); ++itr) + cout<<"\t"<<itr->first<<"\t"<<itr->second<<endl; + commit_txn(cur_env); + cout << endl << flush; +} + +static void usage() { + cerr << "usage: " << progname << endl + << "[-h home][-o host:port][-m host:port][-f host:port]" + << "[-n nsites][-p priority]" << endl; + + cerr << "\t -h home (required; h stands for home directory)" << endl + << "\t -l host:port (required; l stands for local)" << endl + << "\t -C or -M (optional; start up as client or master)" << endl + << "\t -r host:port (optional; r stands for remote; any " + << "number of these" << endl + << "\t may be specified)" << endl + << "\t -R host:port (optional; R stands for remote peer; only " + << "one of" << endl + << "\t these may be specified)" << endl + << "\t -a all|quorum (optional; a stands for ack policy)" << endl + << "\t -b (optional; b stands for bulk)" << endl + << "\t -n nsites (optional; number of sites in replication " + << "group; defaults " << endl + << "\t to 0 to try to dynamically compute nsites)" << endl + << "\t -p priority (optional; defaults to 100)" << endl + << "\t -v (optional; v stands for verbose)" << endl; + + exit(EXIT_FAILURE); +} + +int main(int argc, char **argv) { + RepConfigInfo config; + char ch, *portstr, *tmphost; + int tmpport; + bool tmppeer; + + // Extract the command line parameters + while ((ch = getopt(argc, argv, "a:bCh:l:Mn:p:R:r:v")) != EOF) { + tmppeer = false; + switch (ch) { + case 'a': + if (strncmp(optarg, "all", 3) == 0) + config.ack_policy = DB_REPMGR_ACKS_ALL; + else if (strncmp(optarg, "quorum", 6) != 0) + usage(); + break; + case 'b': + config.bulk = true; + break; + case 'C': + config.start_policy = DB_REP_CLIENT; + break; + case 'h': + config.home = optarg; + break; + case 'l': + config.this_host.host = strtok(optarg, ":"); + if ((portstr = strtok(NULL, ":")) == NULL) { + cerr << "\nBad host specification." << endl; + usage(); + } + config.this_host.port = (unsigned short)atoi(portstr); + config.got_listen_address = true; + break; + case 'M': + config.start_policy = DB_REP_MASTER; + break; + case 'n': + config.totalsites = atoi(optarg); + break; + case 'p': + config.priority = atoi(optarg); + break; + case 'R': + tmppeer = true; // FALLTHROUGH + case 'r': + tmphost = strtok(optarg, ":"); + if ((portstr = strtok(NULL, ":")) == NULL) { + cerr << "Bad host specification." << endl; + usage(); + } + tmpport = (unsigned short)atoi(portstr); + + config.addOtherHost(tmphost, tmpport, tmppeer); + + break; + case 'v': + config.verbose = true; + break; + case '?': + default: + usage(); + } + } + + // Error check command line. + if ((!config.got_listen_address) || config.home == NULL) + usage(); + + RepQuoteExample runner; + try { + runner.init(&config); + runner.doloop(); + } catch (DbException dbe) { + cerr << "\nCaught an exception during initialization or" + << " processing: " << dbe.what() << endl; + } + runner.terminate(); + return 0; +} + +// This is a very simple thread that performs checkpoints at a fixed +// time interval. For a master site, the time interval is one minute +// plus the duration of the checkpoint_delay timeout (30 seconds by +// default.) For a client site, the time interval is one minute. +// +void *checkpoint_thread(void *args) +{ + DbEnv *env; + APP_DATA *app; + int i, ret; + + env = (DbEnv *)args; + app = (APP_DATA *)env->get_app_private(); + + for (;;) { + // Wait for one minute, polling once per second to see if + // application has finished. When application has finished, + // terminate this thread. + // + for (i = 0; i < 60; i++) { + sleep(1); + if (app->app_finished == 1) + return ((void *)EXIT_SUCCESS); + } + + // Perform a checkpoint. + if ((ret = env->txn_checkpoint(0, 0, 0)) != 0) { + env->err(ret, "Could not perform checkpoint.\n"); + return ((void *)EXIT_FAILURE); + } + } +} + +// This is a simple log archive thread. Once per minute, it removes all but +// the most recent 3 logs that are safe to remove according to a call to +// DBENV->log_archive(). +// +// Log cleanup is needed to conserve disk space, but aggressive log cleanup +// can cause more frequent client initializations if a client lags too far +// behind the current master. This can happen in the event of a slow client, +// a network partition, or a new master that has not kept as many logs as the +// previous master. +// +// The approach in this routine balances the need to mitigate against a +// lagging client by keeping a few more of the most recent unneeded logs +// with the need to conserve disk space by regularly cleaning up log files. +// Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE +// flag) is not recommended for replication due to the risk of frequent +// client initializations. +// +void *log_archive_thread(void *args) +{ + DbEnv *env; + APP_DATA *app; + char **begin, **list; + int i, listlen, logs_to_keep, minlog, ret; + + env = (DbEnv *)args; + app = (APP_DATA *)env->get_app_private(); + logs_to_keep = 3; + + for (;;) { + // Wait for one minute, polling once per second to see if + // application has finished. When application has finished, + // terminate this thread. + // + for (i = 0; i < 60; i++) { + sleep(1); + if (app->app_finished == 1) + return ((void *)EXIT_SUCCESS); + } + + // Get the list of unneeded log files. + if ((ret = env->log_archive(&list, DB_ARCH_ABS)) != 0) { + env->err(ret, "Could not get log archive list."); + return ((void *)EXIT_FAILURE); + } + if (list != NULL) { + listlen = 0; + // Get the number of logs in the list. + for (begin = list; *begin != NULL; begin++, listlen++); + // Remove all but the logs_to_keep most recent + // unneeded log files. + // + minlog = listlen - logs_to_keep; + for (begin = list, i= 0; i < minlog; list++, i++) { + if ((ret = unlink(*list)) != 0) { + env->err(ret, + "logclean: remove %s", *list); + env->errx( + "logclean: Error remove %s", *list); + free(begin); + return ((void *)EXIT_FAILURE); + } + } + free(begin); + } + } +} + |