summaryrefslogtreecommitdiff
path: root/db/rep/rep_method.c
diff options
context:
space:
mode:
Diffstat (limited to 'db/rep/rep_method.c')
-rw-r--r--db/rep/rep_method.c1444
1 files changed, 760 insertions, 684 deletions
diff --git a/db/rep/rep_method.c b/db/rep/rep_method.c
index 9e83dd729..749123e5f 100644
--- a/db/rep/rep_method.c
+++ b/db/rep/rep_method.c
@@ -1,115 +1,232 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001-2004
- * Sleepycat Software. All rights reserved.
+ * Copyright (c) 2001-2006
+ * Oracle Corporation. All rights reserved.
*
- * $Id: rep_method.c,v 1.167 2004/10/07 17:20:12 bostic Exp $
+ * $Id: rep_method.c,v 12.46 2006/09/09 14:19:20 bostic Exp $
*/
#include "db_config.h"
-#ifndef NO_SYSTEM_INCLUDES
-#include <sys/types.h>
-
-#ifdef HAVE_RPC
-#include <rpc/rpc.h>
-#endif
-
-#include <stdlib.h>
-#include <string.h>
-#endif
-
-#ifdef HAVE_RPC
-#include "db_server.h"
-#endif
-
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/btree.h"
#include "dbinc/log.h"
#include "dbinc/txn.h"
-#ifdef HAVE_RPC
-#include "dbinc_auto/rpc_client_ext.h"
-#endif
-
-static int __rep_abort_prepared __P((DB_ENV *));
-static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *));
-static int __rep_elect
- __P((DB_ENV *, int, int, int, u_int32_t, int *, u_int32_t));
-static int __rep_elect_init
- __P((DB_ENV *, DB_LSN *, int, int, int, int *, u_int32_t *));
-static int __rep_flush __P((DB_ENV *));
-static int __rep_restore_prepared __P((DB_ENV *));
-static int __rep_get_limit __P((DB_ENV *, u_int32_t *, u_int32_t *));
-static int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t));
-static int __rep_set_request __P((DB_ENV *, u_int32_t, u_int32_t));
-static int __rep_set_rep_transport __P((DB_ENV *, int,
- int (*)(DB_ENV *, const DBT *, const DBT *, const DB_LSN *,
- int, u_int32_t)));
-static int __rep_start __P((DB_ENV *, DBT *, u_int32_t));
-static int __rep_wait __P((DB_ENV *, u_int32_t, int *, u_int32_t));
+static int __rep_abort_prepared __P((DB_ENV *));
+static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *));
+static void __rep_config_map __P((DB_ENV *, u_int32_t *, u_int32_t *));
+static u_int32_t __rep_conv_vers __P((DB_ENV *, u_int32_t));
+static int __rep_restore_prepared __P((DB_ENV *));
/*
* __rep_dbenv_create --
* Replication-specific initialization of the DB_ENV structure.
*
- * PUBLIC: void __rep_dbenv_create __P((DB_ENV *));
+ * PUBLIC: int __rep_dbenv_create __P((DB_ENV *));
*/
-void
+int
__rep_dbenv_create(dbenv)
DB_ENV *dbenv;
{
-#ifdef HAVE_RPC
- if (F_ISSET(dbenv, DB_ENV_RPCCLIENT)) {
- dbenv->rep_elect = __dbcl_rep_elect;
- dbenv->rep_flush = __dbcl_rep_flush;
- dbenv->rep_process_message = __dbcl_rep_process_message;
- dbenv->rep_start = __dbcl_rep_start;
- dbenv->rep_stat = __dbcl_rep_stat;
- dbenv->rep_stat_print = NULL;
- dbenv->get_rep_limit = __dbcl_rep_get_limit;
- dbenv->set_rep_limit = __dbcl_rep_set_limit;
- dbenv->set_rep_request = __dbcl_rep_set_request;
- dbenv->set_rep_transport = __dbcl_rep_set_rep_transport;
+ DB_REP *db_rep;
+ int ret;
- } else
+ if ((ret = __os_calloc(dbenv, 1, sizeof(DB_REP), &db_rep)) != 0)
+ return (ret);
+
+ db_rep->eid = DB_EID_INVALID;
+ db_rep->request_gap = DB_REP_REQUEST_GAP;
+ db_rep->max_gap = DB_REP_MAX_GAP;
+
+#ifdef HAVE_REPLICATION_THREADS
+ if ((ret = __repmgr_dbenv_create(dbenv, db_rep)) != 0) {
+ __os_free(dbenv, db_rep);
+ return (ret);
+ }
+#endif
+
+ dbenv->rep_handle = db_rep;
+ return (0);
+}
+
+/*
+ * __rep_dbenv_destroy --
+ * Replication-specific destruction of the DB_ENV structure.
+ *
+ * PUBLIC: void __rep_dbenv_destroy __P((DB_ENV *));
+ */
+void
+__rep_dbenv_destroy(dbenv)
+ DB_ENV *dbenv;
+{
+ if (dbenv->rep_handle != NULL) {
+#ifdef HAVE_REPLICATION_THREADS
+ __repmgr_dbenv_destroy(dbenv, dbenv->rep_handle);
#endif
- {
- dbenv->rep_elect = __rep_elect;
- dbenv->rep_flush = __rep_flush;
- dbenv->rep_process_message = __rep_process_message;
- dbenv->rep_start = __rep_start;
- dbenv->rep_stat = __rep_stat_pp;
- dbenv->rep_stat_print = __rep_stat_print_pp;
- dbenv->get_rep_limit = __rep_get_limit;
- dbenv->set_rep_limit = __rep_set_limit;
- dbenv->set_rep_request = __rep_set_request;
- dbenv->set_rep_transport = __rep_set_rep_transport;
+ __os_free(dbenv, dbenv->rep_handle);
+ dbenv->rep_handle = NULL;
}
}
/*
- * __rep_open --
- * Replication-specific initialization of the DB_ENV structure.
+ * __rep_get_config --
+ * Return the replication subsystem configuration.
*
- * PUBLIC: int __rep_open __P((DB_ENV *));
+ * PUBLIC: int __rep_get_config __P((DB_ENV *, u_int32_t, int *));
*/
int
-__rep_open(dbenv)
+__rep_get_config(dbenv, which, onp)
DB_ENV *dbenv;
+ u_int32_t which;
+ int *onp;
{
DB_REP *db_rep;
+ REP *rep;
+ u_int32_t mapped;
+
+#undef OK_FLAGS
+#define OK_FLAGS \
+ (DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | \
+ DB_REP_CONF_NOAUTOINIT | DB_REP_CONF_NOWAIT)
+
+ if (FLD_ISSET(which, ~OK_FLAGS))
+ return (__db_ferr(dbenv, "DB_ENV->rep_get_config", 0));
+
+ db_rep = dbenv->rep_handle;
+ ENV_NOT_CONFIGURED(
+ dbenv, db_rep->region, "DB_ENV->rep_get_config", DB_INIT_REP);
+
+ mapped = 0;
+ __rep_config_map(dbenv, &which, &mapped);
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ if (FLD_ISSET(rep->config, mapped))
+ *onp = 1;
+ else
+ *onp = 0;
+ } else {
+ if (FLD_ISSET(db_rep->config, mapped))
+ *onp = 1;
+ else
+ *onp = 0;
+ }
+ return (0);
+}
+
+/*
+ * __rep_set_config --
+ * Configure the replication subsystem.
+ *
+ * PUBLIC: int __rep_set_config __P((DB_ENV *, u_int32_t, int));
+ */
+int
+__rep_set_config(dbenv, which, on)
+ DB_ENV *dbenv;
+ u_int32_t which;
+ int on;
+{
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ LOG *lp;
+ REP *rep;
+ REP_BULK bulk;
int ret;
+ u_int32_t mapped, orig;
- if ((ret = __os_calloc(dbenv, 1, sizeof(DB_REP), &db_rep)) != 0)
- return (ret);
- dbenv->rep_handle = db_rep;
- ret = __rep_region_init(dbenv);
+ ret = 0;
+
+#undef OK_FLAGS
+#define OK_FLAGS \
+ (DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | \
+ DB_REP_CONF_NOAUTOINIT | DB_REP_CONF_NOWAIT)
+
+ if (FLD_ISSET(which, ~OK_FLAGS))
+ return (__db_ferr(dbenv, "DB_ENV->rep_set_config", 0));
+
+ db_rep = dbenv->rep_handle;
+ ENV_NOT_CONFIGURED(
+ dbenv, db_rep->region, "DB_ENV->rep_set_config", DB_INIT_REP);
+
+ mapped = 0;
+ __rep_config_map(dbenv, &which, &mapped);
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ MUTEX_LOCK(dbenv, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(dbenv);
+ orig = rep->config;
+ if (on)
+ FLD_SET(rep->config, mapped);
+ else
+ FLD_CLR(rep->config, mapped);
+
+ /*
+ * Bulk transfer requires special processing if it is getting
+ * toggled.
+ */
+ dblp = dbenv->lg_handle;
+ lp = dblp->reginfo.primary;
+ if (FLD_ISSET(rep->config, REP_C_BULK) &&
+ !FLD_ISSET(orig, REP_C_BULK))
+ db_rep->bulk = R_ADDR(&dblp->reginfo, lp->bulk_buf);
+ REP_SYSTEM_UNLOCK(dbenv);
+
+ /*
+ * If turning bulk off and it was on, send out whatever is in
+ * the buffer already.
+ */
+ if (FLD_ISSET(orig, REP_C_BULK) &&
+ !FLD_ISSET(rep->config, REP_C_BULK) && lp->bulk_off != 0) {
+ memset(&bulk, 0, sizeof(bulk));
+ if (db_rep->bulk == NULL)
+ bulk.addr =
+ R_ADDR(&dblp->reginfo, lp->bulk_buf);
+ else
+ bulk.addr = db_rep->bulk;
+ bulk.offp = &lp->bulk_off;
+ bulk.len = lp->bulk_len;
+ bulk.type = REP_BULK_LOG;
+ bulk.eid = DB_EID_BROADCAST;
+ bulk.flagsp = &lp->bulk_flags;
+ ret = __rep_send_bulk(dbenv, &bulk, 0);
+ }
+ MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
+ } else {
+ if (on)
+ FLD_SET(db_rep->config, mapped);
+ else
+ FLD_CLR(db_rep->config, mapped);
+ }
return (ret);
}
+static void
+__rep_config_map(dbenv, inflagsp, outflagsp)
+ DB_ENV *dbenv;
+ u_int32_t *inflagsp, *outflagsp;
+{
+ COMPQUIET(dbenv, NULL);
+
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_BULK)) {
+ FLD_SET(*outflagsp, REP_C_BULK);
+ FLD_CLR(*inflagsp, DB_REP_CONF_BULK);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_DELAYCLIENT)) {
+ FLD_SET(*outflagsp, REP_C_DELAYCLIENT);
+ FLD_CLR(*inflagsp, DB_REP_CONF_DELAYCLIENT);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOAUTOINIT)) {
+ FLD_SET(*outflagsp, REP_C_NOAUTOINIT);
+ FLD_CLR(*inflagsp, DB_REP_CONF_NOAUTOINIT);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOWAIT)) {
+ FLD_SET(*outflagsp, REP_C_NOWAIT);
+ FLD_CLR(*inflagsp, DB_REP_CONF_NOWAIT);
+ }
+}
+
/*
* __rep_start --
* Become a master or client, and start sending messages to participate
@@ -121,7 +238,8 @@ __rep_open(dbenv)
* the library. Rep_start checks the following:
*
* rep->msg_th - this is the count of threads currently in rep_process_message
- * rep->start_th - this is set if a thread is in rep_start.
+ * rep->lockout_th - this is set if a thread is in rep_start or other
+ * operation requiring lockout with rep_proc_msg threads.
* rep->handle_cnt - number of threads actively using a dbp in library.
* rep->txn_cnt - number of active txns.
* REP_F_READY - Replication flag that indicates that we wish to run
@@ -135,8 +253,10 @@ __rep_open(dbenv)
* stored in the replication region. This prevents the use of handles on
* clients that reference non-existent files whose creation was backed out
* during a synchronizing recovery.
+ *
+ * PUBLIC: int __rep_start __P((DB_ENV *, DBT *, u_int32_t));
*/
-static int
+int
__rep_start(dbenv, dbt, flags)
DB_ENV *dbenv;
DBT *dbt;
@@ -145,20 +265,22 @@ __rep_start(dbenv, dbt, flags)
DB_LOG *dblp;
DB_LSN lsn;
DB_REP *db_rep;
+ LOG *lp;
REP *rep;
- u_int32_t repflags;
- int announce, init_db, redo_prepared, ret, role_chg;
- int sleep_cnt, t_ret;
+ u_int32_t oldvers, pending_event, repflags;
+ int announce, init_db, locked, redo_prepared, ret, role_chg;
+ int t_ret;
#ifdef DIAGNOSTIC
DB_MSGBUF mb;
#endif
PANIC_CHECK(dbenv);
- ENV_ILLEGAL_BEFORE_OPEN(dbenv, "DB_ENV->rep_start");
- ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_start", DB_INIT_REP);
+ ENV_REQUIRES_CONFIG_XX(
+ dbenv, rep_handle, "DB_ENV->rep_start", DB_INIT_REP);
db_rep = dbenv->rep_handle;
rep = db_rep->region;
+ locked = 0;
if ((ret = __db_fchk(dbenv, "DB_ENV->rep_start", flags,
DB_REP_CLIENT | DB_REP_MASTER)) != 0)
@@ -169,83 +291,78 @@ __rep_start(dbenv, dbt, flags)
"DB_ENV->rep_start", flags, DB_REP_CLIENT, DB_REP_MASTER)) != 0)
return (ret);
if (!LF_ISSET(DB_REP_CLIENT | DB_REP_MASTER)) {
- __db_err(dbenv,
+ __db_errx(dbenv,
"DB_ENV->rep_start: replication mode must be specified");
return (EINVAL);
}
/* We need a transport function. */
- if (dbenv->rep_send == NULL) {
- __db_err(dbenv,
- "DB_ENV->set_rep_transport must be called before DB_ENV->rep_start");
+ if (db_rep->send == NULL) {
+ __db_errx(dbenv,
+ "DB_ENV->rep_set_transport must be called before DB_ENV->rep_start");
return (EINVAL);
}
/*
- * If we are about to become (or stay) a master. Let's flush the log
- * to close any potential holes that might happen when upgrading from
- * client to master status.
+ * In order to correctly check log files for old versions, we
+ * need to flush the logs.
*/
- if (LF_ISSET(DB_REP_MASTER) && (ret = __log_flush(dbenv, NULL)) != 0)
+ if ((ret = __log_flush(dbenv, NULL)) != 0)
return (ret);
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
+ pending_event = DB_EVENT_NO_SUCH_EVENT;
+ REP_SYSTEM_LOCK(dbenv);
/*
* We only need one thread to start-up replication, so if
* there is another thread in rep_start, we'll let it finish
- * its work and have this thread simply return.
+ * its work and have this thread simply return. Similarly,
+ * if a thread is in a critical lockout section we return.
*/
- if (rep->start_th != 0) {
+ if (rep->lockout_th != 0) {
/*
- * There is already someone in rep_start. Return.
+ * There is already someone in lockout. Return.
*/
- RPRINT(dbenv, rep, (dbenv, &mb, "Thread already in rep_start"));
+ RPRINT(dbenv, (dbenv, &mb, "Thread already in lockout"));
goto err;
- } else
- rep->start_th = 1;
+ } else if ((ret = __rep_lockout_msg(dbenv, rep, 0)) != 0)
+ goto errunlock;
- role_chg = (F_ISSET(rep, REP_F_CLIENT) && LF_ISSET(DB_REP_MASTER)) ||
- (F_ISSET(rep, REP_F_MASTER) && LF_ISSET(DB_REP_CLIENT));
+ role_chg = (!F_ISSET(rep, REP_F_MASTER) && LF_ISSET(DB_REP_MASTER)) ||
+ (!F_ISSET(rep, REP_F_CLIENT) && LF_ISSET(DB_REP_CLIENT));
/*
* Wait for any active txns or mpool ops to complete, and
* prevent any new ones from occurring, only if we're
- * changing roles. If we are not changing roles, then we
- * only need to coordinate with msg_th.
+ * changing roles.
*/
- if (role_chg)
- __rep_lockout(dbenv, db_rep, rep, 0);
- else {
- for (sleep_cnt = 0; rep->msg_th != 0;) {
- if (++sleep_cnt % 60 == 0)
- __db_err(dbenv,
- "DB_ENV->rep_start waiting %d minutes for replication message thread",
- sleep_cnt / 60);
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
- __os_sleep(dbenv, 1, 0);
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
- }
+ if (role_chg) {
+ if ((ret = __rep_lockout_api(dbenv, rep)) != 0)
+ goto errunlock;
+ locked = 1;
}
- if (rep->eid == DB_EID_INVALID)
- rep->eid = dbenv->rep_eid;
-
if (LF_ISSET(DB_REP_MASTER)) {
if (role_chg) {
/*
* If we're upgrading from having been a client,
- * preclose, so that we close our temporary database.
- *
- * Do not close files that we may have opened while
- * doing a rep_apply; they'll get closed when we
- * finally close the environment, but for now, leave
- * them open, as we don't want to recycle their
- * fileids, and we may need the handles again if
- * we become a client and the original master
- * that opened them becomes a master again.
+ * preclose, so that we close our temporary database
+ * and any files we opened while doing a rep_apply.
+ * If we don't we can infinitely leak file ids if
+ * the master crashed with files open (the likely
+ * case). If we don't close them we can run into
+ * problems if we try to remove that file or long
+ * running applications end up with an unbounded
+ * number of used fileids, each getting written
+ * on checkpoint. Just close them.
+ * Then invalidate all files open in the logging
+ * region. These are files open by other processes
+ * attached to the environment. They must be
+ * closed by the other processes when they notice
+ * the change in role.
*/
- if ((ret = __rep_preclose(dbenv, 0)) != 0)
+ if ((ret = __rep_preclose(dbenv)) != 0)
goto errunlock;
+
}
redo_prepared = 0;
@@ -273,38 +390,68 @@ __rep_start(dbenv, dbt, flags)
}
if (rep->egen <= rep->gen)
rep->egen = rep->gen + 1;
- RPRINT(dbenv, rep, (dbenv, &mb,
+ RPRINT(dbenv, (dbenv, &mb,
"New master gen %lu, egen %lu",
(u_long)rep->gen, (u_long)rep->egen));
}
rep->master_id = rep->eid;
/*
- * Note, setting flags below implicitly clears out
- * REP_F_NOARCHIVE, REP_F_INIT and REP_F_READY.
+ * Clear out almost everything, and then set MASTER. Leave
+ * READY alone in case we did a lockout above; we'll clear it in
+ * a moment (below), once we've written the txn_recycle into the
+ * log.
*/
- rep->flags = REP_F_MASTER;
- rep->start_th = 0;
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
+ repflags = F_ISSET(rep, REP_F_READY);
+ FLD_SET(repflags, REP_F_MASTER);
+ rep->flags = repflags;
+
dblp = (DB_LOG *)dbenv->lg_handle;
- R_LOCK(dbenv, &dblp->reginfo);
- lsn = ((LOG *)dblp->reginfo.primary)->lsn;
- R_UNLOCK(dbenv, &dblp->reginfo);
+ lp = dblp->reginfo.primary;
+ /*
+ * We're master. Set the versions to the current ones.
+ */
+ oldvers = lp->persist.version;
+ /*
+ * If we're moving forward to the current version, we need
+ * to force the log file to advance and reset the
+ * recovery table since it contains pointers to old
+ * recovery functions.
+ */
+ RPRINT(dbenv, (dbenv, &mb,
+ "rep_start: Old log version was %lu", (u_long)oldvers));
+ if (lp->persist.version != DB_LOGVERSION) {
+ if ((ret = __env_init_rec(dbenv, DB_LOGVERSION)) != 0)
+ goto errunlock;
+ }
+ rep->version = DB_REPVERSION;
+ rep->lockout_th = 0;
+ REP_SYSTEM_UNLOCK(dbenv);
+ LOG_SYSTEM_LOCK(dbenv);
+ lsn = lp->lsn;
+ LOG_SYSTEM_UNLOCK(dbenv);
/*
* Send the NEWMASTER message first so that clients know
* subsequent messages are coming from the right master.
- * We need to perform all actions below no master what
+ * We need to perform all actions below no matter what
* regarding errors.
*/
(void)__rep_send_message(dbenv,
- DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0);
+ DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
ret = 0;
if (role_chg) {
- ret = __txn_reset(dbenv);
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
+ pending_event = DB_EVENT_REP_MASTER;
+ ret = __dbreg_invalidate_files(dbenv);
+ if ((t_ret = __rep_closefiles(dbenv)) != 0 && ret == 0)
+ ret = t_ret;
+ if ((t_ret = __txn_reset(dbenv)) != 0 && ret == 0)
+ ret = t_ret;
+ DB_ENV_TEST_RECYCLE(dbenv, ret);
+ REP_SYSTEM_LOCK(dbenv);
F_CLR(rep, REP_F_READY);
rep->in_recovery = 0;
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
+ locked = 0;
+ REP_SYSTEM_UNLOCK(dbenv);
}
/*
* Take a transaction checkpoint so that our new generation
@@ -321,10 +468,9 @@ __rep_start(dbenv, dbt, flags)
announce = role_chg || rep->master_id == DB_EID_INVALID;
/*
- * If we're changing roles from master to client or if
- * we never were any role at all, we need to init the db.
+ * If we're changing roles we need to init the db.
*/
- if (role_chg || !F_ISSET(rep, REP_F_CLIENT)) {
+ if (role_chg) {
rep->master_id = DB_EID_INVALID;
init_db = 1;
}
@@ -332,9 +478,18 @@ __rep_start(dbenv, dbt, flags)
repflags = F_ISSET(rep, REP_F_NOARCHIVE |
REP_F_RECOVER_MASK | REP_F_TALLY);
FLD_SET(repflags, REP_F_CLIENT);
-
+ if ((ret = __log_get_oldversion(dbenv, &oldvers)) != 0)
+ goto errunlock;
+ RPRINT(dbenv, (dbenv, &mb,
+ "rep_start: Found old version log %d", oldvers));
+ if (oldvers >= DB_LOGVERSION_42) {
+ __log_set_version(dbenv, oldvers);
+ oldvers = __rep_conv_vers(dbenv, oldvers);
+ DB_ASSERT(dbenv, oldvers != DB_REPVERSION_INVALID);
+ rep->version = oldvers;
+ }
rep->flags = repflags;
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
+ REP_SYSTEM_UNLOCK(dbenv);
/*
* Abort any prepared transactions that were restored
@@ -347,18 +502,21 @@ __rep_start(dbenv, dbt, flags)
if ((ret = __rep_abort_prepared(dbenv)) != 0)
goto errlock;
- MUTEX_LOCK(dbenv, db_rep->db_mutexp);
+ MUTEX_LOCK(dbenv, rep->mtx_clientdb);
ret = __rep_client_dbinit(dbenv, init_db, REP_DB);
- MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
+ MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
if (ret != 0)
goto errlock;
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
- rep->start_th = 0;
- if (role_chg) {
+ if (role_chg)
+ pending_event = DB_EVENT_REP_CLIENT;
+ REP_SYSTEM_LOCK(dbenv);
+ rep->lockout_th = 0;
+ if (locked) {
F_CLR(rep, REP_F_READY);
rep->in_recovery = 0;
+ locked = 0;
}
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
+ REP_SYSTEM_UNLOCK(dbenv);
/*
* If this client created a newly replicated environment,
@@ -370,30 +528,31 @@ __rep_start(dbenv, dbt, flags)
*/
if (announce)
(void)__rep_send_message(dbenv,
- DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0);
+ DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0, 0);
else
(void)__rep_send_message(dbenv,
- DB_EID_BROADCAST, REP_ALIVE_REQ, NULL, NULL, 0);
+ DB_EID_BROADCAST, REP_ALIVE_REQ, NULL, NULL, 0, 0);
}
if (0) {
/*
* We have separate labels for errors. If we're returning an
- * error before we've set start_th, we use 'err'. If
- * we are erroring while holding the rep_mutex, then we use
+ * error before we've set lockout_th, we use 'err'. If
+ * we are erroring while holding the region mutex, then we use
* 'errunlock' label. If we're erroring without holding the rep
* mutex we must use 'errlock'.
*/
-errlock:
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
-errunlock:
- rep->start_th = 0;
- if (role_chg) {
+DB_TEST_RECOVERY_LABEL
+errlock: REP_SYSTEM_LOCK(dbenv);
+errunlock: rep->lockout_th = 0;
+ if (locked) {
F_CLR(rep, REP_F_READY);
rep->in_recovery = 0;
}
-err: MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
+err: REP_SYSTEM_UNLOCK(dbenv);
}
+ if (pending_event != DB_EVENT_NO_SUCH_EVENT)
+ DB_EVENT(dbenv, pending_event, NULL);
return (ret);
}
@@ -428,9 +587,6 @@ __rep_client_dbinit(dbenv, startup, which)
rep = db_rep->region;
dbp = NULL;
-#define REPDBNAME "__db.rep.db"
-#define REPPAGENAME "__db.reppg.db"
-
if (which == REP_DB) {
name = REPDBNAME;
rdbpp = &db_rep->rep_db;
@@ -443,7 +599,7 @@ __rep_client_dbinit(dbenv, startup, which)
return (0);
if (startup) {
- if ((ret = db_create(&dbp, dbenv, DB_REP_CREATE)) != 0)
+ if ((ret = db_create(&dbp, dbenv, 0)) != 0)
goto err;
/*
* Ignore errors, because if the file doesn't exist, this
@@ -452,7 +608,7 @@ __rep_client_dbinit(dbenv, startup, which)
(void)__db_remove(dbp, NULL, name, NULL, DB_FORCE);
}
- if ((ret = db_create(&dbp, dbenv, DB_REP_CREATE)) != 0)
+ if ((ret = db_create(&dbp, dbenv, 0)) != 0)
goto err;
if (which == REP_DB &&
(ret = __bam_set_bt_compare(dbp, __rep_bt_cmp)) != 0)
@@ -545,10 +701,10 @@ __rep_abort_prepared(dbenv)
region = mgr->reginfo.primary;
do_aborts = 0;
- R_LOCK(dbenv, &mgr->reginfo);
+ TXN_SYSTEM_LOCK(dbenv);
if (region->stat.st_nrestores != 0)
do_aborts = 1;
- R_UNLOCK(dbenv, &mgr->reginfo);
+ TXN_SYSTEM_UNLOCK(dbenv);
if (do_aborts) {
op = DB_FIRST;
@@ -584,18 +740,26 @@ __rep_restore_prepared(dbenv)
{
DB_LOGC *logc;
DB_LSN ckp_lsn, lsn;
+ DB_REP *db_rep;
+ DB_TXNHEAD *txninfo;
DBT rec;
+ REP *rep;
__txn_ckp_args *ckp_args;
+ __txn_ckp_42_args *ckp42_args;
__txn_regop_args *regop_args;
+ __txn_regop_42_args *regop42_args;
__txn_xa_regop_args *prep_args;
int ret, t_ret;
- u_int32_t hi_txn, low_txn, rectype, status;
- void *txninfo;
+ u_int32_t hi_txn, low_txn, rectype, status, txnid, txnop;
+ db_rep = dbenv->rep_handle;
+ rep = db_rep->region;
txninfo = NULL;
ckp_args = NULL;
+ ckp42_args = NULL;
prep_args = NULL;
regop_args = NULL;
+ regop42_args = NULL;
ZERO_LSN(ckp_lsn);
ZERO_LSN(lsn);
@@ -616,24 +780,34 @@ __rep_restore_prepared(dbenv)
memset(&rec, 0, sizeof(DBT));
if ((ret = __txn_getckp(dbenv, &lsn)) == 0) {
if ((ret = __log_c_get(logc, &lsn, &rec, DB_SET)) != 0) {
- __db_err(dbenv,
+ __db_errx(dbenv,
"Checkpoint record at LSN [%lu][%lu] not found",
(u_long)lsn.file, (u_long)lsn.offset);
goto err;
}
- if ((ret = __txn_ckp_read(dbenv, rec.data, &ckp_args)) != 0) {
- __db_err(dbenv,
+ if (rep->version >= DB_REPVERSION_43) {
+ if ((ret = __txn_ckp_read(dbenv, rec.data,
+ &ckp_args)) == 0) {
+ ckp_lsn = ckp_args->ckp_lsn;
+ __os_free(dbenv, ckp_args);
+ }
+ } else {
+ if ((ret = __txn_ckp_42_read(dbenv, rec.data,
+ &ckp42_args)) == 0) {
+ ckp_lsn = ckp42_args->ckp_lsn;
+ __os_free(dbenv, ckp42_args);
+ }
+ }
+ if (ret != 0) {
+ __db_errx(dbenv,
"Invalid checkpoint record at [%lu][%lu]",
(u_long)lsn.file, (u_long)lsn.offset);
goto err;
}
- ckp_lsn = ckp_args->ckp_lsn;
- __os_free(dbenv, ckp_args);
-
if ((ret = __log_c_get(logc, &ckp_lsn, &rec, DB_SET)) != 0) {
- __db_err(dbenv,
+ __db_errx(dbenv,
"Checkpoint LSN record [%lu][%lu] not found",
(u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
goto err;
@@ -644,7 +818,7 @@ __rep_restore_prepared(dbenv)
ret = 0;
goto done;
}
- __db_err(dbenv, "Attempt to get first log record failed");
+ __db_errx(dbenv, "Attempt to get first log record failed");
goto err;
}
@@ -676,7 +850,7 @@ __rep_restore_prepared(dbenv)
* Note that DB_NOTFOUND is unacceptable here because we
* had to have looked at some log record to get this far.
*/
- __db_err(dbenv, "Final log record not found");
+ __db_errx(dbenv, "Final log record not found");
goto err;
}
do {
@@ -708,7 +882,7 @@ __rep_restore_prepared(dbenv)
* were isolated, this should be safe.
*/
for (ret = __log_c_get(logc, &lsn, &rec, DB_LAST);
- ret == 0 && log_compare(&lsn, &ckp_lsn) > 0;
+ ret == 0 && LOG_COMPARE(&lsn, &ckp_lsn) > 0;
ret = __log_c_get(logc, &lsn, &rec, DB_PREV)) {
memcpy(&rectype, rec.data, sizeof(rectype));
switch (rectype) {
@@ -718,19 +892,29 @@ __rep_restore_prepared(dbenv)
* which! Just add it to the list of txns
* that are resolved.
*/
- if ((ret = __txn_regop_read(dbenv, rec.data,
- &regop_args)) != 0)
- goto err;
+ if (rep->version >= DB_REPVERSION_44) {
+ if ((ret = __txn_regop_read(dbenv, rec.data,
+ &regop_args)) != 0)
+ goto err;
+ txnid = regop_args->txnp->txnid;
+ txnop = regop_args->opcode;
+ __os_free(dbenv, regop_args);
+ } else {
+ if ((ret = __txn_regop_42_read(dbenv, rec.data,
+ &regop42_args)) != 0)
+ goto err;
+ txnid = regop42_args->txnp->txnid;
+ txnop = regop42_args->opcode;
+ __os_free(dbenv, regop42_args);
+ }
ret = __db_txnlist_find(dbenv,
- txninfo, regop_args->txnid->txnid, &status);
+ txninfo, txnid, &status);
if (ret == DB_NOTFOUND)
ret = __db_txnlist_add(dbenv, txninfo,
- regop_args->txnid->txnid,
- regop_args->opcode, &lsn);
+ txnid, txnop, &lsn);
else if (ret != 0)
goto err;
- __os_free(dbenv, regop_args);
break;
case DB___txn_xa_regop:
/*
@@ -742,11 +926,11 @@ __rep_restore_prepared(dbenv)
&prep_args)) != 0)
goto err;
ret = __db_txnlist_find(dbenv, txninfo,
- prep_args->txnid->txnid, &status);
+ prep_args->txnp->txnid, &status);
if (ret == DB_NOTFOUND) {
if (prep_args->opcode == TXN_ABORT)
ret = __db_txnlist_add(dbenv, txninfo,
- prep_args->txnid->txnid,
+ prep_args->txnp->txnid,
prep_args->opcode, &lsn);
else if ((ret =
__rep_process_txn(dbenv, &rec)) == 0)
@@ -774,7 +958,14 @@ err: t_ret = __log_c_close(logc);
return (ret == 0 ? t_ret : ret);
}
-static int
+/*
+ * __rep_get_limit --
+ * Get the limit on the amount of data that will be sent during a single
+ * invocation of __rep_process_message.
+ *
+ * PUBLIC: int __rep_get_limit __P((DB_ENV *, u_int32_t *, u_int32_t *));
+ */
+int
__rep_get_limit(dbenv, gbytesp, bytesp)
DB_ENV *dbenv;
u_int32_t *gbytesp, *bytesp;
@@ -782,22 +973,24 @@ __rep_get_limit(dbenv, gbytesp, bytesp)
DB_REP *db_rep;
REP *rep;
- PANIC_CHECK(dbenv);
- ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_get_limit",
- DB_INIT_REP);
-
- if (!REP_ON(dbenv)) {
- __db_err(dbenv,
- "DB_ENV->get_rep_limit: database environment not properly initialized");
- return (__db_panic(dbenv, EINVAL));
- }
db_rep = dbenv->rep_handle;
- rep = db_rep->region;
-
- if (gbytesp != NULL)
- *gbytesp = rep->gbytes;
- if (bytesp != NULL)
- *bytesp = rep->bytes;
+ ENV_NOT_CONFIGURED(
+ dbenv, db_rep->region, "DB_ENV->rep_get_limit", DB_INIT_REP);
+
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ REP_SYSTEM_LOCK(dbenv);
+ if (gbytesp != NULL)
+ *gbytesp = rep->gbytes;
+ if (bytesp != NULL)
+ *bytesp = rep->bytes;
+ REP_SYSTEM_UNLOCK(dbenv);
+ } else {
+ if (gbytesp != NULL)
+ *gbytesp = db_rep->gbytes;
+ if (bytesp != NULL)
+ *bytesp = db_rep->bytes;
+ }
return (0);
}
@@ -806,8 +999,10 @@ __rep_get_limit(dbenv, gbytesp, bytesp)
* __rep_set_limit --
* Set a limit on the amount of data that will be sent during a single
* invocation of __rep_process_message.
+ *
+ * PUBLIC: int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t));
*/
-static int
+int
__rep_set_limit(dbenv, gbytes, bytes)
DB_ENV *dbenv;
u_int32_t gbytes, bytes;
@@ -815,448 +1010,178 @@ __rep_set_limit(dbenv, gbytes, bytes)
DB_REP *db_rep;
REP *rep;
- PANIC_CHECK(dbenv);
- ENV_ILLEGAL_BEFORE_OPEN(dbenv, "DB_ENV->rep_set_limit");
- ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_set_limit",
- DB_INIT_REP);
-
- if (!REP_ON(dbenv)) {
- __db_err(dbenv,
- "DB_ENV->set_rep_limit: database environment not properly initialized");
- return (__db_panic(dbenv, EINVAL));
- }
db_rep = dbenv->rep_handle;
- rep = db_rep->region;
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
+ ENV_NOT_CONFIGURED(
+ dbenv, db_rep->region, "DB_ENV->rep_set_limit", DB_INIT_REP);
+
if (bytes > GIGABYTE) {
gbytes += bytes / GIGABYTE;
bytes = bytes % GIGABYTE;
}
- rep->gbytes = gbytes;
- rep->bytes = bytes;
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
+
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ REP_SYSTEM_LOCK(dbenv);
+ rep->gbytes = gbytes;
+ rep->bytes = bytes;
+ REP_SYSTEM_UNLOCK(dbenv);
+ } else {
+ db_rep->gbytes = gbytes;
+ db_rep->bytes = bytes;
+ }
return (0);
}
/*
- * __rep_set_request --
- * Set the minimum and maximum number of log records that we wait
- * before retransmitting.
- * UNDOCUMENTED.
+ * PUBLIC: int __rep_set_nsites __P((DB_ENV *, int));
*/
-static int
-__rep_set_request(dbenv, min, max)
+int
+__rep_set_nsites(dbenv, n)
DB_ENV *dbenv;
- u_int32_t min, max;
+ int n;
{
- LOG *lp;
- DB_LOG *dblp;
DB_REP *db_rep;
REP *rep;
- PANIC_CHECK(dbenv);
- ENV_ILLEGAL_BEFORE_OPEN(dbenv, "DB_ENV->rep_set_request");
- ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_set_request",
- DB_INIT_REP);
-
- if (!REP_ON(dbenv)) {
- __db_err(dbenv,
- "DB_ENV->set_rep_request: database environment not properly initialized");
- return (__db_panic(dbenv, EINVAL));
+ if (n <= 0) {
+ __db_errx(dbenv,
+ "DB_ENV->rep_set_nsites: nsites must be a positive number");
+ return (EINVAL);
}
+
db_rep = dbenv->rep_handle;
- rep = db_rep->region;
- /*
- * Note we acquire the rep_mutexp or the db_mutexp as needed.
- */
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
- rep->request_gap = min;
- rep->max_gap = max;
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
- MUTEX_LOCK(dbenv, db_rep->db_mutexp);
- dblp = dbenv->lg_handle;
- if (dblp != NULL && (lp = dblp->reginfo.primary) != NULL) {
- lp->wait_recs = 0;
- lp->rcvd_recs = 0;
- }
- MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
+ /* TODO: ENV_REQUIRES_CONFIG(... ) and/or ENV_NOT_CONFIGURED (?) */
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ rep->config_nsites = n;
+ } else
+ db_rep->config_nsites = n;
return (0);
}
/*
- * __rep_set_transport --
- * Set the transport function for replication.
+ * PUBLIC: int __rep_get_nsites __P((DB_ENV *, int *));
*/
-static int
-__rep_set_rep_transport(dbenv, eid, f_send)
+int
+__rep_get_nsites(dbenv, n)
DB_ENV *dbenv;
- int eid;
- int (*f_send) __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *,
- int, u_int32_t));
+ int *n;
{
- PANIC_CHECK(dbenv);
+ DB_REP *db_rep;
+ REP *rep;
+
+ db_rep = dbenv->rep_handle;
+
+ /* TODO: ENV_REQUIRES_CONFIG(... ) and/or ENV_NOT_CONFIGURED (?) */
+
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ *n = rep->config_nsites;
+ } else
+ *n = db_rep->config_nsites;
- if (f_send == NULL) {
- __db_err(dbenv,
- "DB_ENV->set_rep_transport: no send function specified");
- return (EINVAL);
- }
- if (eid < 0) {
- __db_err(dbenv,
- "DB_ENV->set_rep_transport: eid must be greater than or equal to 0");
- return (EINVAL);
- }
- dbenv->rep_send = f_send;
- dbenv->rep_eid = eid;
return (0);
}
/*
- * __rep_elect --
- * Called after master failure to hold/participate in an election for
- * a new master.
+ * PUBLIC: int __rep_set_priority __P((DB_ENV *, int));
*/
-static int
-__rep_elect(dbenv, nsites, nvotes, priority, timeout, eidp, flags)
+int
+__rep_set_priority(dbenv, priority)
DB_ENV *dbenv;
- int nsites, nvotes, priority;
- u_int32_t timeout;
- int *eidp;
- u_int32_t flags;
+ int priority;
{
- DB_LOG *dblp;
- DB_LSN lsn;
DB_REP *db_rep;
REP *rep;
- int ack, done, in_progress, ret, send_vote;
- u_int32_t egen, orig_tally, tiebreaker, to;
-#ifdef DIAGNOSTIC
- DB_MSGBUF mb;
-#endif
- PANIC_CHECK(dbenv);
- COMPQUIET(flags, 0);
- ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_elect", DB_INIT_REP);
-
- /* Error checking. */
- if (nsites <= 0) {
- __db_err(dbenv,
- "DB_ENV->rep_elect: nsites must be greater than 0");
- return (EINVAL);
- }
- if (nvotes < 0) {
- __db_err(dbenv,
- "DB_ENV->rep_elect: nvotes may not be negative");
- return (EINVAL);
- }
if (priority < 0) {
- __db_err(dbenv,
- "DB_ENV->rep_elect: priority may not be negative");
+ __db_errx(dbenv, "priority may not be negative");
return (EINVAL);
}
- if (nsites < nvotes) {
- __db_err(dbenv,
- "DB_ENV->rep_elect: nvotes (%d) is larger than nsites (%d)",
- nvotes, nsites);
- return (EINVAL);
- }
-
- ack = nvotes;
- /* If they give us a 0 for nvotes, default to simple majority. */
- if (nvotes == 0)
- ack = (nsites / 2) + 1;
-
- /*
- * XXX
- * If users give us less than a majority, they run the risk of
- * having a network partition. However, this also allows the
- * scenario of master/1 client to elect the client. Allow
- * sub-majority values, but give a warning.
- */
- if (nvotes <= (nsites / 2)) {
- __db_err(dbenv,
- "DB_ENV->rep_elect:WARNING: nvotes (%d) is sub-majority with nsites (%d)",
- nvotes, nsites);
- }
-
db_rep = dbenv->rep_handle;
- rep = db_rep->region;
- dblp = dbenv->lg_handle;
-
- RPRINT(dbenv, rep,
- (dbenv, &mb, "Start election nsites %d, ack %d, priority %d",
- nsites, ack, priority));
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ rep->priority = priority;
+ } else
+ db_rep->my_priority = priority;
+ return (0);
+}
- R_LOCK(dbenv, &dblp->reginfo);
- lsn = ((LOG *)dblp->reginfo.primary)->lsn;
- R_UNLOCK(dbenv, &dblp->reginfo);
+/*
+ * PUBLIC: int __rep_get_priority __P((DB_ENV *, int *));
+ */
+int
+__rep_get_priority(dbenv, priority)
+ DB_ENV *dbenv;
+ int *priority;
+{
+ DB_REP *db_rep;
+ REP *rep;
- orig_tally = 0;
- to = timeout;
- if ((ret = __rep_elect_init(dbenv,
- &lsn, nsites, ack, priority, &in_progress, &orig_tally)) != 0) {
- if (ret == DB_REP_NEWMASTER) {
- ret = 0;
- *eidp = dbenv->rep_eid;
- }
- goto err;
- }
- /*
- * If another thread is in the middle of an election we
- * just quietly return and not interfere.
- */
- if (in_progress) {
- *eidp = rep->master_id;
- return (0);
- }
- (void)__rep_send_message(dbenv,
- DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0);
- ret = __rep_wait(dbenv, to/4, eidp, REP_F_EPHASE1);
- switch (ret) {
- case 0:
- /* Check if we found a master. */
- if (*eidp != DB_EID_INVALID) {
- RPRINT(dbenv, rep, (dbenv, &mb,
- "Found master %d", *eidp));
- goto edone;
- }
- /*
- * If we didn't find a master, continue
- * the election.
- */
- break;
- case DB_REP_EGENCHG:
- /*
- * Egen changed, just continue with election.
- */
- break;
- case DB_TIMEOUT:
- RPRINT(dbenv, rep, (dbenv, &mb,
- "Did not find master. Sending vote1"));
- break;
- default:
- goto err;
- }
-restart:
- /* Generate a randomized tiebreaker value. */
- __os_unique_id(dbenv, &tiebreaker);
+ db_rep = dbenv->rep_handle;
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ *priority = rep->priority;
+ } else
+ *priority = db_rep->my_priority;
+ return (0);
+}
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
- F_SET(rep, REP_F_EPHASE1 | REP_F_NOARCHIVE);
- F_CLR(rep, REP_F_TALLY);
+/*
+ * PUBLIC: int __rep_set_timeout __P((DB_ENV *, int, db_timeout_t));
+ */
+int
+__rep_set_timeout(dbenv, which, timeout)
+ DB_ENV *dbenv;
+ int which;
+ db_timeout_t timeout;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ int ret;
- /*
- * We are about to participate at this egen. We must
- * write out the next egen before participating in this one
- * so that if we crash we can never participate in this egen
- * again.
- */
- if ((ret = __rep_write_egen(dbenv, rep->egen + 1)) != 0)
- goto lockdone;
+ db_rep = dbenv->rep_handle;
+ rep = db_rep->region;
+ ret = 0;
- /* Tally our own vote */
- if (__rep_tally(dbenv, rep, rep->eid, &rep->sites, rep->egen,
- rep->tally_off) != 0) {
+ switch (which) {
+ case DB_REP_ELECTION_TIMEOUT:
+ if (REP_ON(dbenv))
+ rep->elect_timeout = timeout;
+ else
+ db_rep->elect_timeout = timeout;
+ break;
+#ifdef HAVE_REPLICATION_THREADS
+ case DB_REP_ACK_TIMEOUT:
+ db_rep->ack_timeout = timeout;
+ break;
+ case DB_REP_ELECTION_RETRY:
+ db_rep->election_retry_wait = timeout;
+ break;
+ case DB_REP_CONNECTION_RETRY:
+ db_rep->connection_retry_wait = timeout;
+ break;
+#endif
+ default:
+ __db_errx(dbenv,
+ "Unknown timeout type argument to DB_ENV->rep_set_timeout");
ret = EINVAL;
- goto lockdone;
- }
- __rep_cmp_vote(dbenv, rep, &rep->eid, &lsn, priority, rep->gen,
- tiebreaker);
-
- RPRINT(dbenv, rep, (dbenv, &mb, "Beginning an election"));
-
- /* Now send vote */
- send_vote = DB_EID_INVALID;
- egen = rep->egen;
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
- __rep_send_vote(dbenv, &lsn, nsites, ack, priority, tiebreaker, egen,
- DB_EID_BROADCAST, REP_VOTE1);
- DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTVOTE1, ret, NULL);
- ret = __rep_wait(dbenv, to, eidp, REP_F_EPHASE1);
- switch (ret) {
- case 0:
- /* Check if election complete or phase complete. */
- if (*eidp != DB_EID_INVALID) {
- RPRINT(dbenv, rep, (dbenv, &mb,
- "Ended election phase 1 %d", ret));
- goto edone;
- }
- goto phase2;
- case DB_REP_EGENCHG:
- if (to > timeout)
- to = timeout;
- to = (to * 8) / 10;
- RPRINT(dbenv, rep, (dbenv, &mb,
-"Egen changed while waiting. Now %lu. New timeout %lu, orig timeout %lu",
- (u_long)rep->egen, (u_long)to, (u_long)timeout));
- /*
- * If the egen changed while we were sleeping, that
- * means we're probably late to the next election,
- * so we'll backoff our timeout so that we don't get
- * into an out-of-phase election scenario.
- *
- * Backoff to 80% of the current timeout.
- */
- goto restart;
- case DB_TIMEOUT:
- break;
- default:
- goto err;
- }
- /*
- * If we got here, we haven't heard from everyone, but we've
- * run out of time, so it's time to decide if we have enough
- * votes to pick a winner and if so, to send out a vote to
- * the winner.
- */
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
- /*
- * If our egen changed while we were waiting. We need to
- * essentially reinitialize our election.
- */
- if (egen != rep->egen) {
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
- RPRINT(dbenv, rep, (dbenv, &mb, "Egen changed from %lu to %lu",
- (u_long)egen, (u_long)rep->egen));
- goto restart;
}
- if (rep->sites >= rep->nvotes) {
-
- /* We think we've seen enough to cast a vote. */
- send_vote = rep->winner;
- /*
- * See if we won. This will make sure we
- * don't count ourselves twice if we're racing
- * with incoming votes.
- */
- if (rep->winner == rep->eid) {
- (void)__rep_tally(dbenv, rep, rep->eid, &rep->votes,
- egen, rep->v2tally_off);
- RPRINT(dbenv, rep, (dbenv, &mb,
- "Counted my vote %d", rep->votes));
- }
- F_SET(rep, REP_F_EPHASE2);
- F_CLR(rep, REP_F_EPHASE1);
- }
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
- if (send_vote == DB_EID_INVALID) {
- /* We do not have enough votes to elect. */
- RPRINT(dbenv, rep, (dbenv, &mb,
- "Not enough votes to elect: recvd %d of %d from %d sites",
- rep->sites, rep->nvotes, rep->nsites));
- ret = DB_REP_UNAVAIL;
- goto err;
- } else {
- /*
- * We have seen enough vote1's. Now we need to wait
- * for all the vote2's.
- */
- if (send_vote != rep->eid) {
- RPRINT(dbenv, rep, (dbenv, &mb, "Sending vote"));
- __rep_send_vote(dbenv, NULL, 0, 0, 0, 0, egen,
- send_vote, REP_VOTE2);
- /*
- * If we are NOT the new master we want to send
- * our vote to the winner, and wait longer. The
- * reason is that the winner may be "behind" us
- * in the election waiting and if the master is
- * down, the winner will wait the full timeout
- * and we want to give the winner enough time to
- * process all the votes. Otherwise we could
- * incorrectly return DB_REP_UNAVAIL and start a
- * new election before the winner can declare
- * itself.
- */
- to = to * 2;
-
- }
-
-phase2: ret = __rep_wait(dbenv, to, eidp, REP_F_EPHASE2);
- RPRINT(dbenv, rep, (dbenv, &mb,
- "Ended election phase 2 %d", ret));
- switch (ret) {
- case 0:
- goto edone;
- case DB_REP_EGENCHG:
- if (to > timeout)
- to = timeout;
- to = (to * 8) / 10;
- RPRINT(dbenv, rep, (dbenv, &mb,
-"While waiting egen changed to %lu. Phase 2 New timeout %lu, orig timeout %lu",
- (u_long)rep->egen,
- (u_long)to, (u_long)timeout));
- goto restart;
- case DB_TIMEOUT:
- ret = DB_REP_UNAVAIL;
- break;
- default:
- goto err;
- }
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
- if (egen != rep->egen) {
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
- RPRINT(dbenv, rep, (dbenv, &mb,
- "Egen ph2 changed from %lu to %lu",
- (u_long)egen, (u_long)rep->egen));
- goto restart;
- }
- done = rep->votes >= rep->nvotes;
- RPRINT(dbenv, rep, (dbenv, &mb,
- "After phase 2: done %d, votes %d, nsites %d",
- done, rep->votes, rep->nsites));
- if (send_vote == rep->eid && done) {
- __rep_elect_master(dbenv, rep, eidp);
- ret = 0;
- goto lockdone;
- }
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
- }
-
-err: MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
-lockdone:
- /*
- * If we get here because of a non-election error, then we
- * did not tally our vote. The only non-election error is
- * from elect_init where we were unable to grow_sites. In
- * that case we do not want to discard all known election info.
- */
- if (ret == 0 || ret == DB_REP_UNAVAIL)
- __rep_elect_done(dbenv, rep);
- else if (orig_tally)
- F_SET(rep, orig_tally);
-
- /*
- * If the election finished elsewhere, we need to decrement
- * the elect_th anyway.
- */
- if (0)
-edone: MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
- rep->elect_th = 0;
-
- RPRINT(dbenv, rep, (dbenv, &mb,
- "Ended election with %d, sites %d, egen %lu, flags 0x%lx",
- ret, rep->sites, (u_long)rep->egen, (u_long)rep->flags));
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
-DB_TEST_RECOVERY_LABEL
return (ret);
}
/*
- * __rep_elect_init
- * Initialize an election. Sets beginp non-zero if the election is
- * already in progress; makes it 0 otherwise.
+ * PUBLIC: int __rep_get_timeout __P((DB_ENV *, int, db_timeout_t *));
*/
-static int
-__rep_elect_init(dbenv, lsnp, nsites, nvotes, priority, beginp, otally)
+int
+__rep_get_timeout(dbenv, which, timeout)
DB_ENV *dbenv;
- DB_LSN *lsnp;
- int nsites, nvotes, priority;
- int *beginp;
- u_int32_t *otally;
+ int which;
+ db_timeout_t *timeout;
{
DB_REP *db_rep;
REP *rep;
@@ -1264,128 +1189,178 @@ __rep_elect_init(dbenv, lsnp, nsites, nvotes, priority, beginp, otally)
db_rep = dbenv->rep_handle;
rep = db_rep->region;
-
ret = 0;
- /* We may miscount, as we don't hold the replication mutex here. */
- rep->stat.st_elections++;
-
- /* If we are already a master; simply broadcast that fact and return. */
- if (F_ISSET(rep, REP_F_MASTER)) {
- (void)__rep_send_message(dbenv,
- DB_EID_BROADCAST, REP_NEWMASTER, lsnp, NULL, 0);
- rep->stat.st_elections_won++;
- return (DB_REP_NEWMASTER);
+ switch (which) {
+ case DB_REP_ELECTION_TIMEOUT:
+ if (REP_ON(dbenv))
+ *timeout = rep->elect_timeout;
+ else
+ *timeout = db_rep->elect_timeout;
+ break;
+#ifdef HAVE_REPLICATION_THREADS
+ case DB_REP_ACK_TIMEOUT:
+ *timeout = db_rep->ack_timeout;
+ break;
+ case DB_REP_ELECTION_RETRY:
+ *timeout = db_rep->election_retry_wait;
+ break;
+ case DB_REP_CONNECTION_RETRY:
+ *timeout = db_rep->connection_retry_wait;
+ break;
+#endif
+ default:
+ __db_errx(dbenv,
+ "Unknown timeout type argument to DB_ENV->rep_get_timeout");
+ ret = EINVAL;
}
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
- if (otally != NULL)
- *otally = F_ISSET(rep, REP_F_TALLY);
- *beginp = IN_ELECTION(rep) || rep->elect_th;
- if (!*beginp) {
+ return (ret);
+}
+
+/*
+ * __rep_get_request --
+ * Get the minimum and maximum number of log records that we wait
+ * before retransmitting.
+ *
+ * !!!
+ * UNDOCUMENTED.
+ *
+ * PUBLIC: int __rep_get_request __P((DB_ENV *, u_int32_t *, u_int32_t *));
+ */
+int
+__rep_get_request(dbenv, minp, maxp)
+ DB_ENV *dbenv;
+ u_int32_t *minp, *maxp;
+{
+ DB_REP *db_rep;
+ REP *rep;
+
+ db_rep = dbenv->rep_handle;
+ ENV_NOT_CONFIGURED(
+ dbenv, db_rep->region, "DB_ENV->rep_get_request", DB_INIT_REP);
+
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
/*
- * Make sure that we always initialize all the election fields
- * before putting ourselves in an election state. That means
- * issuing calls that can fail (allocation) before setting all
- * the variables.
+ * We acquire the mtx_region or mtx_clientdb mutexes as needed.
*/
- if (nsites > rep->asites &&
- (ret = __rep_grow_sites(dbenv, nsites)) != 0)
- goto err;
- DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTINIT, ret, NULL);
- rep->elect_th = 1;
- rep->nsites = nsites;
- rep->nvotes = nvotes;
- rep->priority = priority;
- rep->master_id = DB_EID_INVALID;
+ REP_SYSTEM_LOCK(dbenv);
+ if (minp != NULL)
+ *minp = rep->request_gap;
+ if (maxp != NULL)
+ *maxp = rep->max_gap;
+ REP_SYSTEM_UNLOCK(dbenv);
+ } else {
+ if (minp != NULL)
+ *minp = db_rep->request_gap;
+ if (maxp != NULL)
+ *maxp = db_rep->max_gap;
}
-DB_TEST_RECOVERY_LABEL
-err: MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
- return (ret);
+
+ return (0);
}
/*
- * __rep_elect_master
- * Set up for new master from election. Must be called with
- * the db_rep->rep_mutex held.
+ * __rep_set_request --
+ * Set the minimum and maximum number of log records that we wait
+ * before retransmitting.
+ *
+ * !!!
+ * UNDOCUMENTED.
*
- * PUBLIC: void __rep_elect_master __P((DB_ENV *, REP *, int *));
+ * PUBLIC: int __rep_set_request __P((DB_ENV *, u_int32_t, u_int32_t));
*/
-void
-__rep_elect_master(dbenv, rep, eidp)
+int
+__rep_set_request(dbenv, min, max)
DB_ENV *dbenv;
- REP *rep;
- int *eidp;
+ u_int32_t min, max;
{
-#ifdef DIAGNOSTIC
- DB_MSGBUF mb;
-#else
- COMPQUIET(dbenv, NULL);
-#endif
- rep->master_id = rep->eid;
- F_SET(rep, REP_F_MASTERELECT);
- if (eidp != NULL)
- *eidp = rep->master_id;
- rep->stat.st_elections_won++;
- RPRINT(dbenv, rep, (dbenv, &mb,
- "Got enough votes to win; election done; winner is %d, gen %lu",
- rep->master_id, (u_long)rep->gen));
+ LOG *lp;
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ REP *rep;
+
+ db_rep = dbenv->rep_handle;
+ ENV_NOT_CONFIGURED(
+ dbenv, db_rep->region, "DB_ENV->rep_set_request", DB_INIT_REP);
+
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ /*
+ * We acquire the mtx_region or mtx_clientdb mutexes as needed.
+ */
+ REP_SYSTEM_LOCK(dbenv);
+ rep->request_gap = min;
+ rep->max_gap = max;
+ REP_SYSTEM_UNLOCK(dbenv);
+
+ MUTEX_LOCK(dbenv, rep->mtx_clientdb);
+ dblp = dbenv->lg_handle;
+ if (dblp != NULL && (lp = dblp->reginfo.primary) != NULL) {
+ lp->wait_recs = 0;
+ lp->rcvd_recs = 0;
+ }
+ MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
+ } else {
+ db_rep->request_gap = min;
+ db_rep->max_gap = max;
+ }
+
+ return (0);
}
-static int
-__rep_wait(dbenv, timeout, eidp, flags)
+/*
+ * __rep_set_transport --
+ * Set the transport function for replication.
+ *
+ * PUBLIC: int __rep_set_transport __P((DB_ENV *, int,
+ * PUBLIC: int (*)(DB_ENV *, const DBT *, const DBT *, const DB_LSN *,
+ * PUBLIC: int, u_int32_t)));
+ */
+int
+__rep_set_transport(dbenv, eid, f_send)
DB_ENV *dbenv;
- u_int32_t timeout;
- int *eidp;
- u_int32_t flags;
+ int eid;
+ int (*f_send) __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *,
+ int, u_int32_t));
{
DB_REP *db_rep;
REP *rep;
- int done, echg;
- u_int32_t egen, sleeptime;
- done = echg = 0;
- db_rep = dbenv->rep_handle;
- rep = db_rep->region;
- egen = rep->egen;
+ if (f_send == NULL) {
+ __db_errx(dbenv,
+ "DB_ENV->rep_set_transport: no send function specified");
+ return (EINVAL);
+ }
- /*
- * The user specifies an overall timeout function, but checking
- * is cheap and the timeout may be a generous upper bound.
- * Sleep repeatedly for the smaller of .5s and timeout/10.
- */
- sleeptime = (timeout > 5000000) ? 500000 : timeout / 10;
- if (sleeptime == 0)
- sleeptime++;
- while (timeout > 0) {
- __os_sleep(dbenv, 0, sleeptime);
- MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
- echg = egen != rep->egen;
- done = !F_ISSET(rep, flags) && rep->master_id != DB_EID_INVALID;
-
- *eidp = rep->master_id;
- MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
-
- if (done)
- return (0);
-
- if (echg)
- return (DB_REP_EGENCHG);
-
- if (timeout > sleeptime)
- timeout -= sleeptime;
- else
- timeout = 0;
+ if (eid < 0) {
+ __db_errx(dbenv,
+ "DB_ENV->rep_set_transport: eid must be greater than or equal to 0");
+ return (EINVAL);
}
- return (DB_TIMEOUT);
+
+ db_rep = dbenv->rep_handle;
+ db_rep->send = f_send;
+
+ if (REP_ON(dbenv)) {
+ rep = db_rep->region;
+ REP_SYSTEM_LOCK(dbenv);
+ rep->eid = eid;
+ REP_SYSTEM_UNLOCK(dbenv);
+ } else
+ db_rep->eid = eid;
+ return (0);
}
/*
* __rep_flush --
* Re-push the last log record to all clients, in case they've lost
- * messages and don't know it.
+ * messages and don't know it.
+ *
+ * PUBLIC: int __rep_flush __P((DB_ENV *));
*/
-static int
+int
__rep_flush(dbenv)
DB_ENV *dbenv;
{
@@ -1395,7 +1370,8 @@ __rep_flush(dbenv)
int ret, t_ret;
PANIC_CHECK(dbenv);
- ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_flush", DB_INIT_REP);
+ ENV_REQUIRES_CONFIG_XX(
+ dbenv, rep_handle, "DB_ENV->rep_flush", DB_INIT_REP);
if ((ret = __log_cursor(dbenv, &logc)) != 0)
return (ret);
@@ -1407,9 +1383,109 @@ __rep_flush(dbenv)
goto err;
(void)__rep_send_message(dbenv,
- DB_EID_BROADCAST, REP_LOG, &lsn, &rec, 0);
+ DB_EID_BROADCAST, REP_LOG, &lsn, &rec, 0, 0);
err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
+
+/*
+ * __rep_sync --
+ * Force a synchronization to occur between this client and the master.
+ * This is the other half of configuring DELAYCLIENT.
+ *
+ * PUBLIC: int __rep_sync __P((DB_ENV *, u_int32_t));
+ */
+int
+__rep_sync(dbenv, flags)
+ DB_ENV *dbenv;
+ u_int32_t flags;
+{
+ DB_LOG *dblp;
+ DB_LSN lsn;
+ DB_REP *db_rep;
+ LOG *lp;
+ REP *rep;
+ int master;
+ u_int32_t type;
+
+ COMPQUIET(flags, 0);
+
+ PANIC_CHECK(dbenv);
+ ENV_REQUIRES_CONFIG_XX(
+ dbenv, rep_handle, "DB_ENV->rep_sync", DB_INIT_REP);
+
+ dblp = dbenv->lg_handle;
+ lp = dblp->reginfo.primary;
+ db_rep = dbenv->rep_handle;
+ rep = db_rep->region;
+
+ /*
+ * Simple cases. If we're not in the DELAY state we have nothing
+ * to do. If we don't know who the master is, send a MASTER_REQ.
+ */
+ MUTEX_LOCK(dbenv, rep->mtx_clientdb);
+ lsn = lp->verify_lsn;
+ MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(dbenv);
+ master = rep->master_id;
+ if (master == DB_EID_INVALID) {
+ REP_SYSTEM_UNLOCK(dbenv);
+ (void)__rep_send_message(dbenv, DB_EID_BROADCAST,
+ REP_MASTER_REQ, NULL, NULL, 0, 0);
+ return (0);
+ }
+ /*
+ * We want to hold the rep mutex to test and then clear the
+ * DELAY flag. Racing threads in here could otherwise result
+ * in dual data streams.
+ */
+ if (!F_ISSET(rep, REP_F_DELAY)) {
+ REP_SYSTEM_UNLOCK(dbenv);
+ return (0);
+ }
+
+ /*
+ * If we get here, we clear the delay flag and kick off a
+ * synchronization. From this point forward, we will
+ * synchronize until the next time the master changes.
+ */
+ F_CLR(rep, REP_F_DELAY);
+ REP_SYSTEM_UNLOCK(dbenv);
+ /*
+ * When we set REP_F_DELAY, we set verify_lsn to the real verify
+ * lsn if we need to verify, or we zeroed it out if this is a client
+ * that needs to sync up from the beginning. So, send the type
+ * of message now that __rep_new_master delayed sending.
+ */
+ if (IS_ZERO_LSN(lsn))
+ type = REP_ALL_REQ;
+ else
+ type = REP_VERIFY_REQ;
+ (void)__rep_send_message(dbenv, master, type, &lsn, NULL, 0,
+ DB_REP_ANYWHERE);
+ return (0);
+}
+
+/*
+ * __rep_conv_vers --
+ * Convert from a log version to the replication message version
+ * that release used.
+ */
+static u_int32_t
+__rep_conv_vers(dbenv, log_ver)
+ DB_ENV *dbenv;
+ u_int32_t log_ver;
+{
+ COMPQUIET(dbenv, NULL);
+ if (log_ver == DB_LOGVERSION_42)
+ return (DB_REPVERSION_42);
+ if (log_ver == DB_LOGVERSION_43)
+ return (DB_REPVERSION_43);
+ if (log_ver == DB_LOGVERSION_44)
+ return (DB_REPVERSION_44);
+ if (log_ver == DB_LOGVERSION_45)
+ return (DB_REPVERSION_45);
+ return (DB_REPVERSION_INVALID);
+}