summaryrefslogtreecommitdiff
path: root/db/repmgr
diff options
context:
space:
mode:
authorPanu Matilainen <pmatilai@redhat.com>2007-07-16 16:48:14 +0300
committerPanu Matilainen <pmatilai@redhat.com>2007-07-16 16:48:14 +0300
commit2cfd3012bfcb5c5c61bbaf662ef084e0ab789d79 (patch)
treee12ee52087506ac8c7a5eee83b17497d98df2d40 /db/repmgr
parentb754fe19fd387ca5fe8e7c00ddaa25c898fa192f (diff)
downloadlibrpm-tizen-2cfd3012bfcb5c5c61bbaf662ef084e0ab789d79.tar.gz
librpm-tizen-2cfd3012bfcb5c5c61bbaf662ef084e0ab789d79.tar.bz2
librpm-tizen-2cfd3012bfcb5c5c61bbaf662ef084e0ab789d79.zip
Update internal BDB to version 4.5.20
Diffstat (limited to 'db/repmgr')
-rw-r--r--db/repmgr/repmgr_elect.c369
-rw-r--r--db/repmgr/repmgr_method.c442
-rw-r--r--db/repmgr/repmgr_msg.c333
-rw-r--r--db/repmgr/repmgr_net.c1041
-rw-r--r--db/repmgr/repmgr_posix.c714
-rw-r--r--db/repmgr/repmgr_queue.c158
-rw-r--r--db/repmgr/repmgr_sel.c875
-rw-r--r--db/repmgr/repmgr_stat.c116
-rw-r--r--db/repmgr/repmgr_util.c415
-rw-r--r--db/repmgr/repmgr_windows.c708
10 files changed, 5171 insertions, 0 deletions
diff --git a/db/repmgr/repmgr_elect.c b/db/repmgr/repmgr_elect.c
new file mode 100644
index 000000000..39ed1e863
--- /dev/null
+++ b/db/repmgr/repmgr_elect.c
@@ -0,0 +1,369 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005-2006
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_elect.c,v 1.23 2006/09/12 01:06:34 alanb Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#include "db_int.h"
+
+static int __repmgr_is_ready __P((DB_ENV *));
+static int __repmgr_elect_main __P((DB_ENV *));
+static void *__repmgr_elect_thread __P((void *));
+static int start_election_thread __P((DB_ENV *));
+
+/*
+ * Starts the election thread, or wakes up an existing one, starting off with
+ * the specified operation (an election, or a call to rep_start(CLIENT), or
+ * nothing). Avoid multiple concurrent elections.
+ *
+ * PUBLIC: int __repmgr_init_election __P((DB_ENV *, int));
+ *
+ * !!!
+ * Caller must hold mutex.
+ */
+int
+__repmgr_init_election(dbenv, initial_operation)
+ DB_ENV *dbenv;
+ int initial_operation;
+{
+ DB_REP *db_rep;
+ int ret;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ db_rep = dbenv->rep_handle;
+
+ if (db_rep->finished) {
+ RPRINT(dbenv, (dbenv, &mb,
+ "ignoring elect thread request %d; repmgr is finished",
+ initial_operation));
+ return (0);
+ }
+
+ db_rep->operation_needed = initial_operation;
+ if (db_rep->elect_thread == NULL)
+ ret = start_election_thread(dbenv);
+ else if (db_rep->elect_thread->finished) {
+ RPRINT(dbenv, (dbenv, &mb, "join dead elect thread"));
+ if ((ret = __repmgr_thread_join(db_rep->elect_thread)) != 0)
+ return (ret);
+ __os_free(dbenv, db_rep->elect_thread);
+ db_rep->elect_thread = NULL;
+ ret = start_election_thread(dbenv);
+ } else {
+ RPRINT(dbenv, (dbenv, &mb, "reusing existing elect thread"));
+ if ((ret = __repmgr_signal(&db_rep->check_election)) != 0)
+ __db_err(dbenv, ret, "can't signal election thread");
+ }
+ return (ret);
+}
+
+/*
+ * !!!
+ * Caller holds mutex.
+ */
+static int
+start_election_thread(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ REPMGR_RUNNABLE *elector;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+
+ if ((ret = __os_malloc(dbenv, sizeof(REPMGR_RUNNABLE), &elector))
+ != 0)
+ return (ret);
+ elector->dbenv = dbenv;
+ elector->run = __repmgr_elect_thread;
+
+ if ((ret = __repmgr_thread_start(dbenv, elector)) == 0)
+ db_rep->elect_thread = elector;
+ else
+ __os_free(dbenv, elector);
+
+ return (ret);
+}
+
+static void *
+__repmgr_elect_thread(args)
+ void *args;
+{
+ DB_ENV *dbenv = args;
+ int ret;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ RPRINT(dbenv, (dbenv, &mb, "starting election thread"));
+
+ if ((ret = __repmgr_elect_main(dbenv)) != 0) {
+ __db_err(dbenv, ret, "election thread failed");
+ __repmgr_thread_failure(dbenv, ret);
+ }
+
+ RPRINT(dbenv, (dbenv, &mb, "election thread is exiting"));
+ return (NULL);
+}
+
+static int
+__repmgr_elect_main(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ DBT my_addr;
+#ifdef DB_WIN32
+ DWORD duration;
+#else
+ struct timespec deadline;
+#endif
+ u_int nsites, nvotes;
+ int chosen_master, done, failure_recovery, last_op, ret, to_do;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ db_rep = dbenv->rep_handle;
+ last_op = 0;
+
+ /*
+ * db_rep->operation_needed is the mechanism by which the outside world
+ * (running in a different thread) tells us what it wants us to do. It
+ * is obviously relevant when we're just starting up. But it can also
+ * be set if a subsequent request for us to do something occurs while
+ * we're still looping.
+ *
+ * ELECT_FAILURE_ELECTION asks us to start by doing an election, but to
+ * do so in failure recovery mode. This failure recovery mode may
+ * persist through several loop iterations: as long as it takes us to
+ * succeed in finding a master, or until we get asked to perform a new
+ * request. Thus the time for mapping ELECT_FAILURE_ELECTION to the
+ * internal ELECT_ELECTION, as well as the setting of the failure
+ * recovery flag, is at the point we receive the new request from
+ * operation_needed (either here, or within the loop below).
+ */
+ LOCK_MUTEX(db_rep->mutex);
+ if (db_rep->finished) {
+ db_rep->elect_thread->finished = TRUE;
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (0);
+ }
+ to_do = db_rep->operation_needed;
+ db_rep->operation_needed = 0;
+ UNLOCK_MUTEX(db_rep->mutex);
+ if (to_do == ELECT_FAILURE_ELECTION) {
+ failure_recovery = TRUE;
+ to_do = ELECT_ELECTION;
+ } else
+ failure_recovery = FALSE;
+
+ for (;;) {
+ RPRINT(dbenv, (dbenv, &mb, "elect thread to do: %d", to_do));
+ switch (to_do) {
+ case ELECT_ELECTION:
+ nsites = __repmgr_get_nsites(db_rep);
+
+ if (db_rep->init_policy == DB_REP_FULL_ELECTION &&
+ !db_rep->found_master)
+ nvotes = nsites;
+ else {
+ nvotes = ELECTION_MAJORITY(nsites);
+
+ /*
+ * If we're doing an election because we noticed
+ * that the master failed, it's reasonable to
+ * expect that the master won't participate. By
+ * not waiting for its vote, we can probably
+ * complete the election faster. But note that
+ * we shouldn't allow this to affect nvotes
+ * calculation.
+ */
+ if (failure_recovery) {
+ nsites--;
+
+ if (nsites == 1) {
+ /*
+ * We've just lost the only
+ * other site in the group, so
+ * there's no point in holding
+ * an election.
+ */
+ if ((ret =
+ __repmgr_become_master(
+ dbenv)) != 0)
+ return (ret);
+ break;
+ }
+ }
+ }
+
+ switch (ret = __rep_elect(dbenv,
+ (int)nsites, (int)nvotes, &chosen_master, 0)) {
+ case DB_REP_UNAVAIL:
+ break;
+
+ case 0:
+ if (chosen_master == SELF_EID &&
+ (ret = __repmgr_become_master(dbenv)) != 0)
+ return (ret);
+ break;
+
+ default:
+ __db_err(
+ dbenv, ret, "unexpected election failure");
+ return (ret);
+ }
+ last_op = ELECT_ELECTION;
+ break;
+ case ELECT_REPSTART:
+ if ((ret =
+ __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0)
+ return (ret);
+ ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT);
+ __os_free(dbenv, my_addr.data);
+ if (ret != 0) {
+ __db_err(dbenv, ret, "rep_start");
+ return (ret);
+ }
+ last_op = ELECT_REPSTART;
+ break;
+ case 0:
+ /*
+ * Nothing to do: this can happen the first time
+ * through, on initialization.
+ */
+ last_op = 0;
+ break;
+ default:
+ DB_ASSERT(dbenv, FALSE);
+ }
+
+ LOCK_MUTEX(db_rep->mutex);
+ while (!__repmgr_is_ready(dbenv)) {
+#ifdef DB_WIN32
+ duration = db_rep->election_retry_wait / 1000;
+ ret = SignalObjectAndWait(db_rep->mutex,
+ db_rep->check_election, duration, FALSE);
+ LOCK_MUTEX(db_rep->mutex);
+ if (ret == WAIT_TIMEOUT)
+ break;
+ DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
+#else
+ __repmgr_compute_wait_deadline(dbenv, &deadline,
+ db_rep->election_retry_wait);
+ if ((ret = pthread_cond_timedwait(
+ &db_rep->check_election, &db_rep->mutex, &deadline))
+ == ETIMEDOUT)
+ break;
+ DB_ASSERT(dbenv, ret == 0);
+#endif
+ }
+
+ /*
+ * Ways we can get here: time out, operation needed, master
+ * becomes valid, or thread shut-down command.
+ *
+ * If we're not yet done, figure out what to do next: if we've
+ * been told explicitly what to do (operation_needed), do that.
+ * Otherwise, what we do next is approximately the complement of
+ * what we just did; in other words, we alternate.
+ */
+ done = IS_VALID_EID(db_rep->master_eid) || db_rep->finished;
+ if (done)
+ db_rep->elect_thread->finished = TRUE;
+ else if ((to_do = db_rep->operation_needed) == 0) {
+ if (last_op == ELECT_ELECTION)
+ to_do = ELECT_REPSTART;
+ else {
+ /*
+ * Generally, if what we previously did is a
+ * rep_start (or nothing, which really just
+ * means another thread did the rep_start before
+ * turning us on), then we next do an election.
+ * However, with the REP_CLIENT init policy we
+ * never do an initial election.
+ */
+ to_do = ELECT_ELECTION;
+ if (db_rep->init_policy == DB_REP_CLIENT &&
+ !db_rep->found_master)
+ to_do = ELECT_REPSTART;
+ }
+ } else {
+ db_rep->operation_needed = 0;
+ if (to_do == ELECT_FAILURE_ELECTION) {
+ failure_recovery = TRUE;
+ to_do = ELECT_ELECTION;
+ } else
+ failure_recovery = FALSE;
+ }
+
+ /*
+ * TODO: is it possible for an operation_needed to be set, with
+ * nevertheless a valid master? I don't think so. Would a more
+ * straightforward exit test involve "operation_needed" instead
+ * of (or in addition to) valid master?
+ */
+
+ UNLOCK_MUTEX(db_rep->mutex);
+ if (done)
+ return (0);
+ }
+}
+
+/*
+ * Tests whether the election thread is ready to do something, or if it should
+ * wait a little while.
+ */
+static int
+__repmgr_is_ready(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ db_rep = dbenv->rep_handle;
+
+ RPRINT(dbenv, (dbenv, &mb,
+ "repmgr elect: opcode %d, finished %d, master %d",
+ db_rep->operation_needed, db_rep->finished, db_rep->master_eid));
+
+ return (db_rep->operation_needed ||
+ db_rep->finished || IS_VALID_EID(db_rep->master_eid));
+}
+
+/*
+ * PUBLIC: int __repmgr_become_master __P((DB_ENV *));
+ */
+int
+__repmgr_become_master(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ DBT my_addr;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+ db_rep->master_eid = SELF_EID;
+ db_rep->found_master = TRUE;
+
+ if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0)
+ return (ret);
+ ret = __rep_start(dbenv, &my_addr, DB_REP_MASTER);
+ __os_free(dbenv, my_addr.data);
+ if (ret != 0)
+ return (ret);
+ if ((ret = __repmgr_stash_generation(dbenv)) != 0)
+ return (ret);
+
+ return (0);
+}
diff --git a/db/repmgr/repmgr_method.c b/db/repmgr/repmgr_method.c
new file mode 100644
index 000000000..d2fdb8eb9
--- /dev/null
+++ b/db/repmgr/repmgr_method.c
@@ -0,0 +1,442 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005-2006
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_method.c,v 1.28 2006/09/11 15:15:20 bostic Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#include "db_int.h"
+
+static int __repmgr_await_threads __P((DB_ENV *));
+
+/*
+ * TODO: should (more of) this function be protected by mutex? Caution: calling
+ * rep_start while holding mutex doesn't work, 'cuz it pushes out a message to
+ * the send() function.
+ */
+/*
+ * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t));
+ */
+int
+__repmgr_start(dbenv, nthreads, flags)
+ DB_ENV *dbenv;
+ int nthreads;
+ u_int32_t flags;
+{
+ DB_REP *db_rep;
+ DBT my_addr;
+ REPMGR_RUNNABLE *selector, *messenger;
+ int ret, i;
+
+ db_rep = dbenv->rep_handle;
+
+ /* Check that the required initialization has been done. */
+ if (db_rep->my_addr.port == 0) {
+ __db_errx(dbenv,
+ "repmgr_set_local_site must be called before repmgr_start");
+ return (EINVAL);
+ }
+
+ if (db_rep->selector != NULL || db_rep->finished) {
+ __db_errx(dbenv,
+ "DB_ENV->repmgr_start may not be called more than once");
+ return (EINVAL);
+ }
+
+ switch (flags) {
+ case DB_REP_CLIENT:
+ case DB_REP_ELECTION:
+ case DB_REP_FULL_ELECTION:
+ case DB_REP_MASTER:
+ break;
+ default:
+ __db_errx(dbenv,
+ "repmgr_start: unrecognized flags parameter value");
+ return (EINVAL);
+ }
+
+ if (nthreads <= 0) {
+ __db_errx(dbenv,
+ "repmgr_start: nthreads parameter must be >= 1");
+ return (EINVAL);
+ }
+
+ if ((ret =
+ __os_calloc(dbenv, (u_int)nthreads, sizeof(REPMGR_RUNNABLE *),
+ &db_rep->messengers)) != 0)
+ return (ret);
+ db_rep->nthreads = nthreads;
+
+ if ((ret = __repmgr_net_init(dbenv, db_rep)) != 0 ||
+ (ret = __repmgr_init_sync(dbenv, db_rep)) != 0 ||
+ (ret = __rep_set_transport(dbenv, SELF_EID, __repmgr_send)) != 0)
+ return (ret);
+
+ /*
+ * Make some sort of call to rep_start before starting other threads, to
+ * ensure that incoming messages being processed always have a rep
+ * context properly configured.
+ */
+ if ((db_rep->init_policy = flags) == DB_REP_MASTER)
+ ret = __repmgr_become_master(dbenv);
+ else {
+ if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0)
+ return (ret);
+ ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT);
+ __os_free(dbenv, my_addr.data);
+ if (ret == 0) {
+ LOCK_MUTEX(db_rep->mutex);
+ ret = __repmgr_init_election(dbenv, 0);
+ UNLOCK_MUTEX(db_rep->mutex);
+ }
+ }
+ if (ret != 0)
+ return (ret);
+
+ if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_RUNNABLE), &selector))
+ != 0)
+ return (ret);
+ selector->dbenv = dbenv;
+ selector->run = __repmgr_select_thread;
+ if ((ret = __repmgr_thread_start(dbenv, selector)) != 0) {
+ __db_err(dbenv, ret, "can't start selector thread");
+ __os_free(dbenv, selector);
+ return (ret);
+ } else
+ db_rep->selector = selector;
+
+ for (i=0; i<nthreads; i++) {
+ if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_RUNNABLE),
+ &messenger)) != 0)
+ return (ret);
+
+ messenger->dbenv = dbenv;
+ messenger->run = __repmgr_msg_thread;
+ if ((ret = __repmgr_thread_start(dbenv, messenger)) != 0) {
+ __os_free(dbenv, messenger);
+ return (ret);
+ }
+ db_rep->messengers[i] = messenger;
+ }
+
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_close __P((DB_ENV *));
+ */
+int
+__repmgr_close(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ int ret, t_ret;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ ret = 0;
+ db_rep = dbenv->rep_handle;
+ if (db_rep->selector != NULL) {
+ RPRINT(dbenv, (dbenv, &mb, "Stopping repmgr threads"));
+ ret = __repmgr_stop_threads(dbenv);
+ if ((t_ret = __repmgr_await_threads(dbenv)) != 0 && ret == 0)
+ ret = t_ret;
+ RPRINT(dbenv, (dbenv, &mb, "Repmgr threads are finished"));
+ }
+
+ if ((t_ret = __repmgr_net_close(dbenv)) != 0 && ret == 0)
+ ret = t_ret;
+
+ if ((t_ret = __repmgr_close_sync(dbenv)) != 0 && ret == 0)
+ ret = t_ret;
+
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int));
+ */
+int
+__repmgr_set_ack_policy(dbenv, policy)
+ DB_ENV *dbenv;
+ int policy;
+{
+ switch (policy) {
+ case DB_REPMGR_ACKS_ALL: /* FALLTHROUGH */
+ case DB_REPMGR_ACKS_ALL_PEERS: /* FALLTHROUGH */
+ case DB_REPMGR_ACKS_NONE: /* FALLTHROUGH */
+ case DB_REPMGR_ACKS_ONE: /* FALLTHROUGH */
+ case DB_REPMGR_ACKS_ONE_PEER: /* FALLTHROUGH */
+ case DB_REPMGR_ACKS_QUORUM:
+ dbenv->rep_handle->perm_policy = policy;
+ return (0);
+ default:
+ __db_errx(dbenv,
+ "Unknown ack_policy in DB_ENV->repmgr_set_ack_policy");
+ return (EINVAL);
+ }
+}
+
+/*
+ * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *));
+ */
+int
+__repmgr_get_ack_policy(dbenv, policy)
+ DB_ENV *dbenv;
+ int *policy;
+{
+ *policy = dbenv->rep_handle->perm_policy;
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_dbenv_create __P((DB_ENV *, DB_REP *));
+ */
+int
+__repmgr_dbenv_create(dbenv, db_rep)
+ DB_ENV *dbenv;
+ DB_REP *db_rep;
+{
+ int ret;
+
+ /* Set some default values. */
+ db_rep->elect_timeout = 2 * 1000000; /* 2 seconds */
+ db_rep->ack_timeout = 1 * 1000000; /* 1 second */
+ db_rep->connection_retry_wait = 30 * 1000000; /* 30 seconds */
+ db_rep->election_retry_wait = 10 * 1000000; /* 10 seconds */
+ db_rep->config_nsites = 0;
+ db_rep->peer = DB_EID_INVALID;
+ db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
+ db_rep->my_priority = 100;
+
+ /*
+ * TODO: OK, this has just crossed my pain tolerance threshold: I think
+ * this is just getting too unjustifiably complex. It's probably just
+ * to have an explicit flag of some sort indicating that initialization
+ * has been done, and maybe not even bother with the fine granularity of
+ * initializing net, sync and queue separately (at least in terms of
+ * letting them succeed or fail independently).
+ */
+#ifdef DB_WIN32
+ db_rep->waiters = NULL;
+#else
+ db_rep->read_pipe = db_rep->write_pipe = -1;
+#endif
+ if ((ret = __repmgr_net_create(dbenv, db_rep)) == 0)
+ ret = __repmgr_queue_create(dbenv, db_rep);
+
+ return (ret);
+}
+
+/*
+ * PUBLIC: void __repmgr_dbenv_destroy __P((DB_ENV *, DB_REP *));
+ */
+void
+__repmgr_dbenv_destroy(dbenv, db_rep)
+ DB_ENV *dbenv;
+ DB_REP *db_rep;
+{
+ __repmgr_queue_destroy(dbenv);
+ __repmgr_net_destroy(dbenv, db_rep);
+ if (db_rep->messengers != NULL) {
+ __os_free(dbenv, db_rep->messengers);
+ db_rep->messengers = NULL;
+ }
+}
+
+/*
+ * PUBLIC: int __repmgr_stop_threads __P((DB_ENV *));
+ */
+int
+__repmgr_stop_threads(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+
+ /*
+ * Hold mutex for the purpose of waking up threads, but then get out of
+ * the way to let them clean up and exit.
+ */
+ LOCK_MUTEX(db_rep->mutex);
+ db_rep->finished = TRUE;
+ if (db_rep->elect_thread != NULL &&
+ (ret = __repmgr_signal(&db_rep->check_election)) != 0)
+ goto unlock;
+
+ if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
+ goto unlock;
+ UNLOCK_MUTEX(db_rep->mutex);
+
+ return (__repmgr_wake_main_thread(dbenv));
+
+unlock:
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
+}
+
+static int
+__repmgr_await_threads(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ REPMGR_RUNNABLE *messenger;
+ int ret, t_ret, i;
+
+ db_rep = dbenv->rep_handle;
+ ret = 0;
+ if (db_rep->elect_thread != NULL) {
+ ret = __repmgr_thread_join(db_rep->elect_thread);
+ __os_free(dbenv, db_rep->elect_thread);
+ db_rep->elect_thread = NULL;
+ }
+
+ /* TODO: if the join fails, how/when do we clean up the memory? */
+ for (i=0; i<db_rep->nthreads && db_rep->messengers[i] != NULL; i++) {
+ messenger = db_rep->messengers[i];
+ if ((t_ret = __repmgr_thread_join(messenger)) != 0 && ret == 0)
+ ret = t_ret;
+ __os_free(dbenv, messenger);
+ db_rep->messengers[i] = NULL; /* necessary? */
+ }
+ __os_free(dbenv, db_rep->messengers);
+ db_rep->messengers = NULL;
+
+ if (db_rep->selector != NULL) {
+ if ((t_ret = __repmgr_thread_join(db_rep->selector)) != 0 &&
+ ret == 0)
+ ret = t_ret;
+ __os_free(dbenv, db_rep->selector);
+ db_rep->selector = NULL;
+ }
+
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_set_local_site __P((DB_ENV *, const char *, u_int,
+ * PUBLIC: u_int32_t));
+ */
+int
+__repmgr_set_local_site(dbenv, host, port, flags)
+ DB_ENV *dbenv;
+ const char *host;
+ u_int port;
+ u_int32_t flags;
+{
+ ADDRINFO *address_list;
+ DB_REP *db_rep;
+ repmgr_netaddr_t addr;
+ int locked, ret;
+ const char *sharable_host;
+ char buffer[MAXHOSTNAMELEN];
+
+ if (flags != 0)
+ return (__db_ferr(dbenv, "DB_ENV->repmgr_set_local_site", 0));
+
+ db_rep = dbenv->rep_handle;
+ if (db_rep->my_addr.port != 0) {
+ __db_errx(dbenv, "Listen address already set");
+ return (EINVAL);
+ }
+
+ /*
+ * If we haven't been given a sharable local host name, we must get one
+ * for the purpose of sharing with our friends, but it's ok to use for
+ * the address look-up. "Helpfully" converting special names such as
+ * "localhost" in this same way is tempting, but in practice turns out
+ * to be too tricky.
+ */
+ if (host == NULL) {
+ if ((ret = gethostname(buffer, sizeof(buffer))) != 0)
+ return (net_errno);
+
+ /* In case truncation leaves no terminating NUL byte: */
+ buffer[sizeof(buffer) - 1] = '\0';
+ sharable_host = buffer;
+ } else
+ sharable_host = host;
+
+ if ((ret = __repmgr_getaddr(dbenv,
+ host, port, AI_PASSIVE, &address_list)) != 0)
+ return (ret);
+
+ if ((ret = __repmgr_pack_netaddr(dbenv,
+ sharable_host, port, address_list, &addr)) != 0) {
+ __db_freeaddrinfo(dbenv, address_list);
+ return (ret);
+ }
+
+ if (REPMGR_SYNC_INITED(db_rep)) {
+ LOCK_MUTEX(db_rep->mutex);
+ locked = TRUE;
+ } else
+ locked = FALSE;
+
+ memcpy(&db_rep->my_addr, &addr, sizeof(addr));
+
+ if (locked)
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (0);
+}
+
+/*
+ * If the application only calls this method from a single thread (e.g., during
+ * its initialization), it will avoid the problems with the non-thread-safe host
+ * name lookup. In any case, if we relegate the blocking lookup to here it
+ * won't affect our select() loop.
+ *
+ * PUBLIC: int __repmgr_add_remote_site __P((DB_ENV *, const char *, u_int,
+ * PUBLIC: int *, u_int32_t));
+ */
+int
+__repmgr_add_remote_site(dbenv, host, port, eidp, flags)
+ DB_ENV *dbenv;
+ const char *host;
+ u_int port;
+ int *eidp;
+ u_int32_t flags;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+ int eid, locked, ret;
+
+ if ((ret = __db_fchk(dbenv,
+ "DB_ENV->repmgr_add_remote_site", flags, DB_REPMGR_PEER)) != 0)
+ return (ret);
+
+ if (host == NULL) {
+ __db_errx(dbenv,
+ "repmgr_add_remote_site: host name is required");
+ return (EINVAL);
+ }
+
+ db_rep = dbenv->rep_handle;
+
+ if (REPMGR_SYNC_INITED(db_rep)) {
+ LOCK_MUTEX(db_rep->mutex);
+ locked = TRUE;
+ } else
+ locked = FALSE;
+
+ if ((ret = __repmgr_add_site(dbenv, host, port, &site)) != 0)
+ goto unlock;
+ eid = EID_FROM_SITE(site);
+
+ if (LF_ISSET(DB_REPMGR_PEER))
+ db_rep->peer = eid;
+ if (eidp != NULL)
+ *eidp = eid;
+
+unlock: if (locked)
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
+}
diff --git a/db/repmgr/repmgr_msg.c b/db/repmgr/repmgr_msg.c
new file mode 100644
index 000000000..ceeeb968d
--- /dev/null
+++ b/db/repmgr/repmgr_msg.c
@@ -0,0 +1,333 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005-2006
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_msg.c,v 1.23 2006/09/11 15:15:20 bostic Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#include "db_int.h"
+
+static int message_loop __P((DB_ENV *));
+static int process_message __P((DB_ENV*, DBT*, DBT*, int));
+static int handle_newsite __P((DB_ENV *, const DBT *));
+static int ack_message __P((DB_ENV *, u_int32_t, DB_LSN *));
+
+/*
+ * PUBLIC: void *__repmgr_msg_thread __P((void *));
+ */
+void *
+__repmgr_msg_thread(args)
+ void *args;
+{
+ DB_ENV *dbenv = args;
+ int ret;
+
+ if ((ret = message_loop(dbenv)) != 0) {
+ __db_err(dbenv, ret, "message thread failed");
+ __repmgr_thread_failure(dbenv, ret);
+ }
+ return (NULL);
+}
+
+static int
+message_loop(dbenv)
+ DB_ENV *dbenv;
+{
+ REPMGR_MESSAGE *msg;
+ int ret;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ while ((ret = __repmgr_queue_get(dbenv, &msg)) == 0) {
+ while ((ret = process_message(dbenv, &msg->control, &msg->rec,
+ msg->originating_eid)) == DB_LOCK_DEADLOCK)
+ RPRINT(dbenv, (dbenv, &mb, "repmgr deadlock retry"));
+
+ __os_free(dbenv, msg);
+ if (ret != 0)
+ return (ret);
+ }
+
+ return (ret == DB_REP_UNAVAIL ? 0 : ret);
+}
+
+static int
+process_message(dbenv, control, rec, eid)
+ DB_ENV *dbenv;
+ DBT *control, *rec;
+ int eid;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ DB_LSN permlsn;
+ int ret;
+ u_int32_t generation;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ db_rep = dbenv->rep_handle;
+
+ /*
+ * Save initial generation number, in case it changes in a close race
+ * with a NEWMASTER. See msgdir.10000/10039/msg00086.html.
+ */
+ generation = db_rep->generation;
+
+ switch (ret =
+ __rep_process_message(dbenv, control, rec, &eid, &permlsn)) {
+ case DB_REP_NEWSITE:
+ return (handle_newsite(dbenv, rec));
+
+ case DB_REP_NEWMASTER:
+ db_rep->found_master = TRUE;
+ /* Check if it's us. */
+ if ((db_rep->master_eid = eid) == SELF_EID) {
+ if ((ret = __repmgr_become_master(dbenv)) != 0)
+ return (ret);
+ } else {
+ /*
+ * Since we have no further need for 'eid' throughout
+ * the remainder of this function, it's (relatively)
+ * safe to pass its address directly to the
+ * application. If that were not the case, we could
+ * instead copy it into a scratch variable.
+ */
+ RPRINT(dbenv,
+ (dbenv, &mb, "firing NEWMASTER (%d) event", eid));
+ DB_EVENT(dbenv, DB_EVENT_REP_NEWMASTER, &eid);
+ if ((ret = __repmgr_stash_generation(dbenv)) != 0)
+ return (ret);
+ }
+ break;
+
+ case DB_REP_HOLDELECTION:
+ LOCK_MUTEX(db_rep->mutex);
+ db_rep->master_eid = DB_EID_INVALID;
+ ret = __repmgr_init_election(dbenv, ELECT_ELECTION);
+ UNLOCK_MUTEX(db_rep->mutex);
+ if (ret != 0)
+ return (ret);
+ break;
+
+ case DB_REP_DUPMASTER:
+ LOCK_MUTEX(db_rep->mutex);
+ db_rep->master_eid = DB_EID_INVALID;
+ ret = __repmgr_init_election(dbenv, ELECT_REPSTART);
+ UNLOCK_MUTEX(db_rep->mutex);
+ if (ret != 0)
+ return (ret);
+ break;
+
+ case DB_REP_ISPERM:
+ /*
+ * Don't bother sending ack if master doesn't care about it.
+ */
+ rep = db_rep->region;
+ if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE ||
+ (IS_PEER_POLICY(db_rep->perm_policy) &&
+ rep->priority == 0))
+ break;
+
+ if ((ret = ack_message(dbenv, generation, &permlsn)) != 0)
+ return (ret);
+
+ break;
+
+ case DB_REP_NOTPERM: /* FALLTHROUGH */
+ case DB_REP_IGNORE: /* FALLTHROUGH */
+ case DB_LOCK_DEADLOCK: /* FALLTHROUGH */
+ case 0:
+ break;
+ default:
+ __db_err(dbenv, ret, "DB_ENV->rep_process_message");
+ return (ret);
+ }
+ return (0);
+}
+
+/*
+ * Acknowledges a message.
+ */
+static int
+ack_message(dbenv, generation, lsn)
+ DB_ENV *dbenv;
+ u_int32_t generation;
+ DB_LSN *lsn;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+ REPMGR_CONNECTION *conn;
+ DB_REPMGR_ACK ack;
+ DBT control2, rec2;
+ int ret;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ db_rep = dbenv->rep_handle;
+ /*
+ * Regardless of where a message came from, all ack's go to the master
+ * site. If we're not in touch with the master, we drop it, since
+ * there's not much else we can do.
+ */
+ if (!IS_VALID_EID(db_rep->master_eid) ||
+ db_rep->master_eid == SELF_EID) {
+ RPRINT(dbenv, (dbenv, &mb,
+ "dropping ack with master %d", db_rep->master_eid));
+ return (0);
+ }
+
+ ret = 0;
+ LOCK_MUTEX(db_rep->mutex);
+ site = SITE_FROM_EID(db_rep->master_eid);
+ if (site->state == SITE_CONNECTED &&
+ !F_ISSET(site->ref.conn, CONN_CONNECTING)) {
+
+ ack.generation = generation;
+ memcpy(&ack.lsn, lsn, sizeof(DB_LSN));
+ control2.data = &ack;
+ control2.size = sizeof(ack);
+ rec2.size = 0;
+
+ conn = site->ref.conn;
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
+ &control2, &rec2)) == DB_REP_UNAVAIL)
+ ret = __repmgr_bust_connection(dbenv, conn, FALSE);
+ }
+
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
+}
+
+/*
+ * Does everything necessary to handle the processing of a NEWSITE return.
+ */
+static int
+handle_newsite(dbenv, rec)
+ DB_ENV *dbenv;
+ const DBT *rec;
+{
+ ADDRINFO *ai;
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+ repmgr_netaddr_t *addr;
+ size_t hlen;
+ u_int16_t port;
+ int ret;
+ char *host;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+ SITE_STRING_BUFFER buffer;
+#endif
+
+ db_rep = dbenv->rep_handle;
+ /*
+ * Check if we got sent connect information and if we did, if
+ * this is me or if we already have a connection to this new
+ * site. If we don't, establish a new one.
+ *
+ * Unmarshall the cdata: a 2-byte port number, in network byte order,
+ * followed by the host name string, which should already be
+ * null-terminated, but let's make sure.
+ */
+ if (rec->size < sizeof(port) + 1) {
+ __db_errx(dbenv, "unexpected cdata size, msg ignored");
+ return (0);
+ }
+ memcpy(&port, rec->data, sizeof(port));
+ port = ntohs(port);
+
+ host = (char*)((u_int8_t*)rec->data + sizeof(port));
+ hlen = (rec->size - sizeof(port)) - 1;
+ host[hlen] = '\0';
+
+ /* It's me, do nothing. */
+ if (strcmp(host, db_rep->my_addr.host) == 0 &&
+ port == db_rep->my_addr.port) {
+ RPRINT(dbenv, (dbenv, &mb, "repmgr ignores own NEWSITE info"));
+ return (0);
+ }
+
+ LOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_add_site(dbenv, host, port, &site)) == EEXIST) {
+ RPRINT(dbenv, (dbenv, &mb,
+ "NEWSITE info from %s was already known",
+ __repmgr_format_site_loc(site, buffer)));
+ /*
+ * TODO: test this. Is this really how it works? When
+ * a site comes back on-line, do we really get NEWSITE?
+ * Or is that return code reserved for only the first
+ * time a site joins a group?
+ */
+
+ /*
+ * TODO: it seems like it might be a good idea to move
+ * this site's retry up to the beginning of the queue,
+ * and try it now, on the theory that if it's
+ * generating a NEWSITE, it might have woken up. Does
+ * that pose a problem for my assumption of time-ordered
+ * retry list? I guess not, if we can reorder items.
+ */
+
+ /*
+ * In case we already know about this site only because it
+ * first connected to us, we may not yet have had a chance to
+ * look up its addresses. Even though we don't need them just
+ * now, this is an advantageous opportunity to get them since we
+ * can do so away from the critical select thread.
+ */
+ addr = &site->net_addr;
+ if (addr->address_list == NULL &&
+ __repmgr_getaddr(dbenv,
+ addr->host, addr->port, 0, &ai) == 0)
+ addr->address_list = ai;
+
+ ret = 0;
+ if (site->state == SITE_IDLE) {
+ /*
+ * TODO: yank the retry object up to the front
+ * of the queue, after marking it as due now
+ */
+ } else
+ goto unlock; /* Nothing to do. */
+ } else {
+ RPRINT(dbenv, (dbenv, &mb, "NEWSITE info added %s",
+ __repmgr_format_site_loc(site, buffer)));
+ if (ret != 0)
+ goto unlock;
+ }
+
+ /*
+ * Wake up the main thread to connect to the new or reawakened
+ * site.
+ */
+ ret = __repmgr_wake_main_thread(dbenv);
+
+unlock: UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_stash_generation __P((DB_ENV *));
+ */
+int
+__repmgr_stash_generation(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP_STAT *statp;
+ int ret;
+
+ if ((ret = __rep_stat_pp(dbenv, &statp, 0)) != 0)
+ return (ret);
+ dbenv->rep_handle->generation = statp->st_gen;
+
+ __os_ufree(dbenv, statp);
+ return (0);
+}
diff --git a/db/repmgr/repmgr_net.c b/db/repmgr/repmgr_net.c
new file mode 100644
index 000000000..15402e490
--- /dev/null
+++ b/db/repmgr/repmgr_net.c
@@ -0,0 +1,1041 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005-2006
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_net.c,v 1.39 2006/09/19 14:14:11 mjc Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#include "db_int.h"
+
+/*
+ * The functions in this module implement a simple wire protocol for
+ * transmitting messages, both replication messages and our own internal control
+ * messages. The protocol is as follows:
+ *
+ * 1 byte - message type (defined in repmgr_int.h)
+ * 4 bytes - size of control
+ * 4 bytes - size of rec
+ * ? bytes - control
+ * ? bytes - rec
+ *
+ * where both sizes are 32-bit binary integers in network byte order.
+ * Either control or rec can have zero length, but even in this case the
+ * 4-byte length will be present.
+ * Putting both lengths right up at the front allows us to read in fewer
+ * phases, and allows us to allocate buffer space for both parts (plus a wrapper
+ * struct) at once.
+ */
+
+/*
+ * In sending a message, we first try to send it in-line, in the sending thread,
+ * and without first copying the message, by using scatter/gather I/O, using
+ * iovecs to point to the various pieces of the message. If that all works
+ * without blocking, that's optimal.
+ * If we find that, for a particular connection, we can't send without
+ * blocking, then we must copy the message for sending later in the select()
+ * thread. In the course of doing that, we might as well "flatten" the message,
+ * forming one single buffer, to simplify life. Not only that, once we've gone
+ * to the trouble of doing that, other sites to which we also want to send the
+ * message (in the case of a broadcast), may as well take advantage of the
+ * simplified structure also.
+ * This structure holds it all. Note that this structure, and the
+ * "flat_msg" structure, are allocated separately, because (1) the flat_msg
+ * version is usually not needed; and (2) when it is needed, it will need to
+ * live longer than the wrapping sending_msg structure.
+ * Note that, for the broadcast case, where we're going to use this
+ * repeatedly, the iovecs is a template that must be copied, since in normal use
+ * the iovecs pointers and lengths get adjusted after every partial write.
+ *
+ * TODO: this is such an important point that it's probably worth renaming the
+ * iovecs field here to iovecs_template.
+ */
+struct sending_msg {
+ REPMGR_IOVECS iovecs;
+ u_int8_t type;
+ u_int32_t control_size_buf, rec_size_buf;
+ REPMGR_FLAT *fmsg;
+};
+
+static int __repmgr_send_broadcast
+ __P((DB_ENV *, const DBT *, const DBT *, u_int *, u_int *));
+static void setup_sending_msg
+ __P((struct sending_msg *, u_int, const DBT *, const DBT *));
+static int __repmgr_send_internal
+ __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
+static int enqueue_msg
+ __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
+static int flatten __P((DB_ENV *, struct sending_msg *));
+static REPMGR_SITE *__repmgr_available_site __P((DB_ENV *, int));
+
+/*
+ * __repmgr_send --
+ * The send function for DB_ENV->rep_set_transport.
+ *
+ * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
+ * PUBLIC: const DB_LSN *, int, u_int32_t));
+ */
+int
+__repmgr_send(dbenv, control, rec, lsnp, eid, flags)
+ DB_ENV *dbenv;
+ const DBT *control, *rec;
+ const DB_LSN *lsnp;
+ int eid;
+ u_int32_t flags;
+{
+ DB_REP *db_rep;
+ u_int nsites, npeers, available, needed;
+ int ret, t_ret;
+ REPMGR_SITE *site;
+ REPMGR_CONNECTION *conn;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ db_rep = dbenv->rep_handle;
+
+ LOCK_MUTEX(db_rep->mutex);
+ if (eid == DB_EID_BROADCAST) {
+ if ((ret = __repmgr_send_broadcast(dbenv, control, rec,
+ &nsites, &npeers)) != 0)
+ goto out;
+ } else {
+ /*
+ * If this is a request that can be sent anywhere, then see if
+ * we can send it to our peer (to save load on the master), but
+ * not if it's a rerequest, 'cuz that likely means we tried this
+ * already and failed.
+ */
+ if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) ==
+ DB_REP_ANYWHERE &&
+ IS_VALID_EID(db_rep->peer) &&
+ (site = __repmgr_available_site(dbenv, db_rep->peer)) !=
+ NULL) {
+ RPRINT(dbenv, (dbenv, &mb, "sending request to peer"));
+ } else if ((site = __repmgr_available_site(dbenv, eid)) ==
+ NULL) {
+ RPRINT(dbenv, (dbenv, &mb,
+ "ignoring message sent to unavailable site"));
+ ret = DB_REP_UNAVAIL;
+ goto out;
+ }
+
+ conn = site->ref.conn;
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
+ control, rec)) == DB_REP_UNAVAIL &&
+ (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
+ ret = t_ret;
+ if (ret != 0)
+ goto out;
+
+ nsites = 1;
+ npeers = site->priority > 0 ? 1 : 0;
+ }
+ /*
+ * Right now, nsites and npeers represent the (maximum) number of sites
+ * we've attempted to begin sending the message to. Of course we
+ * haven't really received any ack's yet. But since we've only sent to
+ * nsites/npeers other sites, that's the maximum number of ack's we
+ * could possibly expect. If even that number fails to satisfy our PERM
+ * policy, there's no point waiting for something that will never
+ * happen.
+ */
+ if (LF_ISSET(DB_REP_PERMANENT)) {
+ switch (db_rep->perm_policy) {
+ case DB_REPMGR_ACKS_NONE:
+ goto out;
+
+ case DB_REPMGR_ACKS_ONE:
+ needed = 1;
+ available = nsites;
+ break;
+
+ case DB_REPMGR_ACKS_ALL:
+ /* Number of sites in the group besides myself. */
+ needed = __repmgr_get_nsites(db_rep) - 1;
+ available = nsites;
+ break;
+
+ case DB_REPMGR_ACKS_ONE_PEER:
+ needed = 1;
+ available = npeers;
+ break;
+
+ case DB_REPMGR_ACKS_ALL_PEERS:
+ /*
+ * Too hard to figure out "needed", since we're not
+ * keeping track of how many peers we have; so just skip
+ * the optimization in this case.
+ */
+ needed = 1;
+ available = npeers;
+ break;
+
+ case DB_REPMGR_ACKS_QUORUM:
+ /*
+ * The minimum number of acks necessary to ensure that
+ * the transaction is durable if an election is held.
+ */
+ needed = (__repmgr_get_nsites(db_rep) - 1) / 2;
+ available = npeers;
+ break;
+
+ default:
+ COMPQUIET(available, 0);
+ COMPQUIET(needed, 0);
+ (void)__db_unknown_path(dbenv, "__repmgr_send");
+ break;
+ }
+ if (available < needed) {
+ ret = DB_REP_UNAVAIL;
+ goto out;
+ }
+ /* In ALL_PEERS case, display of "needed" might be confusing. */
+ RPRINT(dbenv, (dbenv, &mb,
+ "will await acknowledgement: need %u", needed));
+ ret = __repmgr_await_ack(dbenv, lsnp);
+ }
+
+out: UNLOCK_MUTEX(db_rep->mutex);
+
+ return (ret);
+}
+
+static REPMGR_SITE *
+__repmgr_available_site(dbenv, eid)
+ DB_ENV *dbenv;
+ int eid;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+
+ db_rep = dbenv->rep_handle;
+ site = SITE_FROM_EID(eid);
+ if (site->state != SITE_CONNECTED)
+ return (NULL);
+
+ if (F_ISSET(site->ref.conn, CONN_CONNECTING))
+ return (NULL);
+ return (site);
+}
+
+/*
+ * Sends message to all sites with which we currently have an active
+ * connection. Sets result parameters according to how many sites we attempted
+ * to begin sending to, even if we did nothing more than queue it for later
+ * delivery.
+ *
+ * !!!
+ * Caller must hold dbenv->mutex.
+ */
+static int
+__repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
+ DB_ENV *dbenv;
+ const DBT *control, *rec;
+ u_int *nsitesp, *npeersp;
+{
+ DB_REP *db_rep;
+ struct sending_msg msg;
+ REPMGR_CONNECTION *conn;
+ REPMGR_SITE *site;
+ u_int nsites, npeers;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+
+ setup_sending_msg(&msg, REPMGR_REP_MESSAGE, control, rec);
+ nsites = npeers = 0;
+
+ /*
+ * Traverse the connections list. Here, even in bust_connection, we
+ * don't unlink the current list entry, so we can use the TAILQ_FOREACH
+ * macro.
+ */
+ TAILQ_FOREACH(conn, &db_rep->connections, entries) {
+ if (F_ISSET(conn, CONN_CONNECTING | CONN_DEFUNCT) ||
+ !IS_VALID_EID(conn->eid))
+ continue;
+
+ if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
+ site = SITE_FROM_EID(conn->eid);
+ nsites++;
+ if (site->priority > 0)
+ npeers++;
+ } else if (ret == DB_REP_UNAVAIL) {
+ if ((ret = __repmgr_bust_connection(
+ dbenv, conn, FALSE)) != 0)
+ return (ret);
+ } else
+ return (ret);
+ }
+
+ *nsitesp = nsites;
+ *npeersp = npeers;
+ return (0);
+}
+
+/*
+ * __repmgr_send_one --
+ * Send a message to a site, or if you can't just yet, make a copy of it
+ * and arrange to have it sent later. 'rec' may be NULL, in which case we send
+ * a zero length and no data.
+ *
+ * If we get an error, we take care of cleaning up the connection (calling
+ * __repmgr_bust_connection()), so that the caller needn't do so.
+ *
+ * !!!
+ * Note that the mutex should be held through this call.
+ * It doubles as a synchronizer to make sure that two threads don't
+ * intersperse writes that are part of two single messages.
+ *
+ * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
+ * PUBLIC: u_int, const DBT *, const DBT *));
+ */
+int
+__repmgr_send_one(dbenv, conn, msg_type, control, rec)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ u_int msg_type;
+ const DBT *control, *rec;
+{
+ struct sending_msg msg;
+
+ setup_sending_msg(&msg, msg_type, control, rec);
+ return (__repmgr_send_internal(dbenv, conn, &msg));
+}
+
+/*
+ * Attempts a "best effort" to send a message on the given site. If there is an
+ * excessive backlog of message already queued on the connection, we simply drop
+ * this message, and still return 0 even in this case.
+ */
+static int
+__repmgr_send_internal(dbenv, conn, msg)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ struct sending_msg *msg;
+{
+#define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */
+ REPMGR_IOVECS iovecs;
+ int ret;
+ size_t nw;
+ size_t total_written;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+ SITE_STRING_BUFFER buffer;
+#endif
+
+ DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
+ if (!STAILQ_EMPTY(&conn->outbound_queue)) {
+ /*
+ * Output to this site is currently owned by the select()
+ * thread, so we can't try sending in-line here. We can only
+ * queue the msg for later.
+ */
+ RPRINT(dbenv, (dbenv, &mb, "msg to %s to be queued",
+ __repmgr_format_eid_loc(dbenv->rep_handle,
+ conn->eid, buffer)));
+ if (conn->out_queue_length < OUT_QUEUE_LIMIT)
+ return (enqueue_msg(dbenv, conn, msg, 0));
+ else {
+ RPRINT(dbenv, (dbenv, &mb, "queue limit exceeded"));
+/* repmgr->stats.tossed_msgs++; */
+ return (0);
+ }
+ }
+
+ /*
+ * Send as much data to the site as we can, without blocking. Keep
+ * writing as long as we're making some progress. Make a scratch copy
+ * of iovecs for our use, since we destroy it in the process of
+ * adjusting pointers after each partial I/O.
+ */
+ memcpy(&iovecs, &msg->iovecs, sizeof(iovecs));
+ total_written = 0;
+ while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset],
+ iovecs.count-iovecs.offset, &nw)) == 0) {
+ total_written += nw;
+ if (__repmgr_update_consumed(&iovecs, nw)) /* all written */
+ return (0);
+ }
+
+ if (ret != WOULDBLOCK) {
+ __db_err(dbenv, ret, "socket writing failure");
+ return (DB_REP_UNAVAIL);
+ }
+
+ RPRINT(dbenv, (dbenv, &mb, "wrote only %lu bytes to %s",
+ (u_long)total_written,
+ __repmgr_format_eid_loc(dbenv->rep_handle, conn->eid, buffer)));
+ /*
+ * We can't send any more without blocking: queue (a pointer to) a
+ * "flattened" copy of the message, so that the select() thread will
+ * finish sending it later.
+ */
+ if ((ret = enqueue_msg(dbenv, conn, msg, total_written)) != 0)
+ return (ret);
+
+/* repmgr->stats.partial_retransmissions++; */
+
+ /*
+ * Wake the main select thread so that it can discover that it has
+ * received ownership of this connection. Note that we didn't have to
+ * do this in the previous case (above), because the non-empty queue
+ * implies that the select() thread is already managing ownership of
+ * this connection.
+ */
+#ifdef DB_WIN32
+ if (WSAEventSelect(conn->fd, conn->event_object,
+ FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) {
+ ret = net_errno;
+ __db_err(dbenv, ret, "can't add FD_WRITE event bit");
+ return (ret);
+ }
+#endif
+ return (__repmgr_wake_main_thread(dbenv));
+}
+
+/*
+ * PUBLIC: int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
+ *
+ * Count up how many sites have ack'ed the given LSN. Returns TRUE if enough
+ * sites have ack'ed; FALSE otherwise.
+ *
+ * !!!
+ * Caller must hold the mutex.
+ */
+int
+__repmgr_is_permanent(dbenv, lsnp)
+ DB_ENV *dbenv;
+ const DB_LSN *lsnp;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+ u_int eid, nsites, npeers;
+ int is_perm, has_missing_peer;
+
+ db_rep = dbenv->rep_handle;
+
+ if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE)
+ return (TRUE);
+
+ nsites = npeers = 0;
+ has_missing_peer = FALSE;
+ for (eid = 0; eid < db_rep->site_cnt; eid++) {
+ site = SITE_FROM_EID(eid);
+ if (site->priority == -1) {
+ /*
+ * Never connected to this site: since we can't know
+ * whether it's a peer, assume the worst.
+ */
+ has_missing_peer = TRUE;
+ continue;
+ }
+
+ if (log_compare(&site->max_ack, lsnp) >= 0) {
+ nsites++;
+ if (site->priority > 0)
+ npeers++;
+ } else {
+ /* This site hasn't ack'ed the message. */
+ if (site->priority > 0)
+ has_missing_peer = TRUE;
+ }
+ }
+
+ switch (db_rep->perm_policy) {
+ case DB_REPMGR_ACKS_ONE:
+ is_perm = (nsites >= 1);
+ break;
+ case DB_REPMGR_ACKS_ONE_PEER:
+ is_perm = (npeers >= 1);
+ break;
+ case DB_REPMGR_ACKS_QUORUM:
+ /*
+ * The minimum number of acks necessary to ensure that the
+ * transaction is durable if an election is held (given that we
+ * always conduct elections according to the standard,
+ * recommended practice of requiring votes from a majority of
+ * sites).
+ */
+ if (__repmgr_get_nsites(db_rep) == 2) {
+ /*
+ * A group of 2 sites is, as always, a special case.
+ * For a transaction to be durable the other site has to
+ * have received it.
+ */
+ is_perm = (npeers >= 1);
+ } else
+ is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2);
+ break;
+ case DB_REPMGR_ACKS_ALL:
+ /* Adjust by 1, since get_nsites includes local site. */
+ is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1);
+ break;
+ case DB_REPMGR_ACKS_ALL_PEERS:
+ if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) {
+ /* Assume missing site might be a peer. */
+ has_missing_peer = TRUE;
+ }
+ is_perm = !has_missing_peer;
+ break;
+ default:
+ is_perm = FALSE;
+ (void)__db_unknown_path(dbenv, "__repmgr_is_permanent");
+ }
+ return (is_perm);
+}
+
+/*
+ * Abandons a connection, to recover from an error. Upon entry the conn struct
+ * must be on the connections list.
+ *
+ * If the 'do_close' flag is true, we do the whole job; the clean-up includes
+ * removing the struct from the list and freeing all its memory, so upon return
+ * the caller must not refer to it any further. Otherwise, we merely mark the
+ * connection for clean-up later by the main thread.
+ *
+ * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
+ * PUBLIC: REPMGR_CONNECTION *, int));
+ *
+ * !!!
+ * Caller holds mutex.
+ */
+int
+__repmgr_bust_connection(dbenv, conn, do_close)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ int do_close;
+{
+ DB_REP *db_rep;
+ int ret, eid;
+
+ db_rep = dbenv->rep_handle;
+ ret = 0;
+
+ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+ eid = conn->eid;
+ if (do_close)
+ __repmgr_cleanup_connection(dbenv, conn);
+ else {
+ F_SET(conn, CONN_DEFUNCT);
+ conn->eid = -1;
+ }
+
+ /*
+ * When we first accepted the incoming connection, we set conn->eid to
+ * -1 to indicate that we didn't yet know what site it might be from.
+ * If we then get here because we later decide it was a redundant
+ * connection, the following scary stuff will correctly not happen.
+ */
+ if (IS_VALID_EID(eid)) {
+ /* schedule_connection_attempt wakes the main thread. */
+ if ((ret = __repmgr_schedule_connection_attempt(
+ dbenv, (u_int)eid, FALSE)) != 0)
+ return (ret);
+
+ if (eid == db_rep->master_eid) {
+ db_rep->master_eid = DB_EID_INVALID;
+
+ if ((ret = __repmgr_init_election(
+ dbenv, ELECT_FAILURE_ELECTION)) != 0)
+ return (ret);
+ }
+ } else if (!do_close) {
+ /*
+ * One way or another, make sure the main thread is poked, so
+ * that we do the deferred clean-up.
+ */
+ ret = __repmgr_wake_main_thread(dbenv);
+ }
+ return (ret);
+}
+
+/*
+ * PUBLIC: void __repmgr_cleanup_connection
+ * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
+ */
+void
+__repmgr_cleanup_connection(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+{
+ DB_REP *db_rep;
+ QUEUED_OUTPUT *out;
+ REPMGR_FLAT *msg;
+ DBT *dbt;
+
+ db_rep = dbenv->rep_handle;
+
+ TAILQ_REMOVE(&db_rep->connections, conn, entries);
+ (void)closesocket(conn->fd);
+#ifdef DB_WIN32
+ (void)WSACloseEvent(conn->event_object);
+#endif
+
+ /*
+ * Deallocate any input and output buffers we may have.
+ */
+ if (conn->reading_phase == DATA_PHASE) {
+ if (conn->msg_type == REPMGR_REP_MESSAGE)
+ __os_free(dbenv, conn->input.rep_message);
+ else {
+ dbt = &conn->input.repmgr_msg.cntrl;
+ __os_free(dbenv, dbt->data);
+ dbt = &conn->input.repmgr_msg.rec;
+ if (dbt->size > 0)
+ __os_free(dbenv, dbt->data);
+ }
+ }
+ while (!STAILQ_EMPTY(&conn->outbound_queue)) {
+ out = STAILQ_FIRST(&conn->outbound_queue);
+ STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
+ msg = out->msg;
+ if (--msg->ref_count <= 0)
+ __os_free(dbenv, msg);
+ __os_free(dbenv, out);
+ }
+
+ __os_free(dbenv, conn);
+}
+
+static int
+enqueue_msg(dbenv, conn, msg, offset)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ struct sending_msg *msg;
+ size_t offset;
+{
+ QUEUED_OUTPUT *q_element;
+ int ret;
+
+ if (msg->fmsg == NULL && ((ret = flatten(dbenv, msg)) != 0))
+ return (ret);
+ if ((ret = __os_malloc(dbenv, sizeof(QUEUED_OUTPUT), &q_element)) != 0)
+ return (ret);
+ q_element->msg = msg->fmsg;
+ msg->fmsg->ref_count++; /* encapsulation would be sweeter */
+ q_element->offset = offset;
+
+ /* Put it on the connection's outbound queue. */
+ STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries);
+ conn->out_queue_length++;
+ return (0);
+}
+
+/*
+ * The 'rec' DBT can be NULL, in which case we treat it like a zero-length DBT.
+ * But 'control' is always present.
+ */
+static void
+setup_sending_msg(msg, type, control, rec)
+ struct sending_msg *msg;
+ u_int type;
+ const DBT *control, *rec;
+{
+ u_int32_t rec_size;
+
+ /*
+ * The wire protocol is documented in a comment at the top of this
+ * module.
+ */
+ __repmgr_iovec_init(&msg->iovecs);
+ msg->type = type;
+ __repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type));
+
+ msg->control_size_buf = htonl(control->size);
+ __repmgr_add_buffer(&msg->iovecs,
+ &msg->control_size_buf, sizeof(msg->control_size_buf));
+
+ rec_size = rec == NULL ? 0 : rec->size;
+ msg->rec_size_buf = htonl(rec_size);
+ __repmgr_add_buffer(
+ &msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf));
+
+ if (control->size > 0)
+ __repmgr_add_dbt(&msg->iovecs, control);
+
+ if (rec_size > 0)
+ __repmgr_add_dbt(&msg->iovecs, rec);
+
+ msg->fmsg = NULL;
+}
+
+/*
+ * Convert a message stored as iovec pointers to various pieces, into flattened
+ * form, by copying all the pieces, and then make the iovec just point to the
+ * new simplified form.
+ */
+static int
+flatten(dbenv, msg)
+ DB_ENV *dbenv;
+ struct sending_msg *msg;
+{
+ u_int8_t *p;
+ size_t msg_size;
+ int i, ret;
+
+ DB_ASSERT(dbenv, msg->fmsg == NULL);
+
+ msg_size = msg->iovecs.total_bytes;
+ if ((ret = __os_malloc(dbenv, sizeof(*msg->fmsg) + msg_size,
+ &msg->fmsg)) != 0)
+ return (ret);
+ msg->fmsg->length = msg_size;
+ msg->fmsg->ref_count = 0;
+ p = &msg->fmsg->data[0];
+
+ for (i = 0; i < msg->iovecs.count; i++) {
+ memcpy(p, msg->iovecs.vectors[i].iov_base,
+ msg->iovecs.vectors[i].iov_len);
+ p = &p[msg->iovecs.vectors[i].iov_len];
+ }
+ __repmgr_iovec_init(&msg->iovecs);
+ __repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size);
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
+ */
+int
+__repmgr_find_site(dbenv, host, port)
+ DB_ENV *dbenv;
+ const char *host;
+ u_int port;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+ u_int i;
+
+ db_rep = dbenv->rep_handle;
+ for (i = 0; i < db_rep->site_cnt; i++) {
+ site = &db_rep->sites[i];
+
+ if (strcmp(site->net_addr.host, host) == 0 &&
+ site->net_addr.port == port)
+ return ((int)i);
+ }
+
+ return (-1);
+}
+
+/*
+ * Stash a copy of the given host name and port number into a convenient data
+ * structure so that we can save it permanently. This is kind of like a
+ * constructor for a netaddr object, except that the caller supplies the memory
+ * for the base struct (though not the subordinate attachments).
+ *
+ * All inputs are assumed to have been already validated.
+ *
+ * PUBLIC: int __repmgr_pack_netaddr __P((DB_ENV *, const char *,
+ * PUBLIC: u_int, ADDRINFO *, repmgr_netaddr_t *));
+ */
+int
+__repmgr_pack_netaddr(dbenv, host, port, list, addr)
+ DB_ENV *dbenv;
+ const char *host;
+ u_int port;
+ ADDRINFO *list;
+ repmgr_netaddr_t *addr;
+{
+ int ret;
+
+ DB_ASSERT(dbenv, host != NULL);
+
+ if ((ret = __os_strdup(dbenv, host, &addr->host)) != 0)
+ return (ret);
+ addr->port = (u_int16_t)port;
+ addr->address_list = list;
+ addr->current = NULL;
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_getaddr __P((DB_ENV *,
+ * PUBLIC: const char *, u_int, int, ADDRINFO **));
+ */
+int
+__repmgr_getaddr(dbenv, host, port, flags, result)
+ DB_ENV *dbenv;
+ const char *host;
+ u_int port;
+ int flags; /* Matches struct addrinfo declaration. */
+ ADDRINFO **result;
+{
+ ADDRINFO *answer, hints;
+ int ret;
+ char buffer[10]; /* 2**16 fits in 5 digits. */
+
+ /*
+ * Ports are really 16-bit unsigned values, but it's too painful to
+ * push that type through the API.
+ */
+ if (port > UINT16_MAX) {
+ __db_errx(dbenv, "port %u larger than max port %u",
+ port, UINT16_MAX);
+ return (EINVAL);
+ }
+
+#ifdef DB_WIN32
+ if (!dbenv->rep_handle->wsa_inited &&
+ (ret = __repmgr_wsa_init(dbenv)) != 0)
+ return (ret);
+#endif
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = flags;
+ (void)snprintf(buffer, sizeof(buffer), "%u", port);
+ if ((ret =
+ __db_getaddrinfo(dbenv, host, port, buffer, &hints, &answer)) != 0)
+ return (ret);
+ *result = answer;
+
+ return (0);
+}
+
+/*
+ * Adds a new site to our array of known sites (unless it already exists),
+ * and schedules it for immediate connection attempt. Whether it exists or not,
+ * we set newsitep, either to the already existing site, or to the newly created
+ * site. Unless newsitep is passed in as NULL, which is allowed.
+ *
+ * PUBLIC: int __repmgr_add_site
+ * PUBLIC: __P((DB_ENV *, const char *, u_int, REPMGR_SITE **));
+ *
+ * !!!
+ * Caller is expected to hold the mutex.
+ */
+int
+__repmgr_add_site(dbenv, host, port, newsitep)
+ DB_ENV *dbenv;
+ const char *host;
+ u_int port;
+ REPMGR_SITE **newsitep;
+{
+ DB_REP *db_rep;
+ ADDRINFO *address_list;
+ repmgr_netaddr_t addr;
+ REPMGR_SITE *site;
+ int ret, eid;
+
+ ret = 0;
+ db_rep = dbenv->rep_handle;
+
+ if (IS_VALID_EID(eid = __repmgr_find_site(dbenv, host, port))) {
+ site = SITE_FROM_EID(eid);
+ ret = EEXIST;
+ goto out;
+ }
+
+ if ((ret = __repmgr_getaddr(dbenv, host, port, 0, &address_list)) != 0)
+ return (ret);
+
+ if ((ret = __repmgr_pack_netaddr(
+ dbenv, host, port, address_list, &addr)) != 0) {
+ __db_freeaddrinfo(dbenv, address_list);
+ return (ret);
+ }
+
+ if ((ret = __repmgr_new_site(dbenv, &site, &addr, SITE_IDLE)) != 0) {
+ __repmgr_cleanup_netaddr(dbenv, &addr);
+ return (ret);
+ }
+
+ if (db_rep->selector != NULL &&
+ (ret = __repmgr_schedule_connection_attempt(
+ dbenv, (u_int)EID_FROM_SITE(site), TRUE)) != 0)
+ return (ret);
+
+ /* Note that we should only come here for success and EEXIST. */
+out:
+ if (newsitep != NULL)
+ *newsitep = site;
+ return (ret);
+}
+
+/*
+ * Initializes net-related memory in the db_rep handle.
+ *
+ * PUBLIC: int __repmgr_net_create __P((DB_ENV *, DB_REP *));
+ */
+int
+__repmgr_net_create(dbenv, db_rep)
+ DB_ENV *dbenv;
+ DB_REP *db_rep;
+{
+ COMPQUIET(dbenv, NULL);
+
+ db_rep->listen_fd = INVALID_SOCKET;
+ db_rep->master_eid = DB_EID_INVALID;
+
+ TAILQ_INIT(&db_rep->connections);
+ TAILQ_INIT(&db_rep->retries);
+
+ return (0);
+}
+
+/*
+ * listen_socket_init --
+ * Initialize a socket for listening. Sets
+ * a file descriptor for the socket, ready for an accept() call
+ * in a thread that we're happy to let block.
+ *
+ * PUBLIC: int __repmgr_listen __P((DB_ENV *));
+ */
+int
+__repmgr_listen(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ ADDRINFO *ai;
+ char *why;
+ int sockopt, ret;
+ socket_t s;
+
+ db_rep = dbenv->rep_handle;
+
+ /* Use OOB value as sentinel to show no socket open. */
+ s = INVALID_SOCKET;
+ ai = ADDR_LIST_FIRST(&db_rep->my_addr);
+
+ /*
+ * Given the assert is correct, we execute the loop at least once, which
+ * means 'why' will have been set by the time it's needed. But I guess
+ * lint doesn't know about DB_ASSERT.
+ */
+ COMPQUIET(why, "");
+ DB_ASSERT(dbenv, ai != NULL);
+ for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) {
+
+ if ((s = socket(ai->ai_family,
+ ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
+ why = "can't create listen socket";
+ continue;
+ }
+
+ /*
+ * When testing, it's common to kill and restart regularly. On
+ * some systems, this causes bind to fail with "address in use"
+ * errors unless this option is set.
+ */
+ sockopt = 1;
+ if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt,
+ sizeof(sockopt)) != 0) {
+ why = "can't set REUSEADDR socket option";
+ break;
+ }
+
+ if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
+ why = "can't bind socket to listening address";
+ (void)closesocket(s);
+ s = INVALID_SOCKET;
+ continue;
+ }
+
+ if (listen(s, 5) != 0) {
+ why = "listen()";
+ break;
+ }
+
+ if ((ret = __repmgr_set_nonblocking(s)) != 0) {
+ __db_err(dbenv, ret, "can't unblock listen socket");
+ goto clean;
+ }
+
+ db_rep->listen_fd = s;
+ return (0);
+ }
+
+ ret = net_errno;
+ __db_err(dbenv, ret, why);
+clean: if (s != INVALID_SOCKET)
+ (void)closesocket(s);
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_net_close __P((DB_ENV *));
+ */
+int
+__repmgr_net_close(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+#ifndef DB_WIN32
+ struct sigaction sigact;
+#endif
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+ if (db_rep->listen_fd == INVALID_SOCKET)
+ return (0);
+
+ ret = 0;
+
+ if (closesocket(db_rep->listen_fd) == SOCKET_ERROR)
+ ret = net_errno;
+
+#ifdef DB_WIN32
+ /* Shut down the Windows sockets DLL. */
+ if (WSACleanup() == SOCKET_ERROR && ret == 0)
+ ret = net_errno;
+ db_rep->wsa_inited = FALSE;
+#else
+ /* Restore original SIGPIPE handling configuration. */
+ if (db_rep->chg_sig_handler) {
+ memset(&sigact, 0, sizeof(sigact));
+ sigact.sa_handler = SIG_DFL;
+ if (sigaction(SIGPIPE, &sigact, NULL) == -1 && ret == 0)
+ ret = errno;
+ }
+#endif
+ db_rep->listen_fd = INVALID_SOCKET;
+ return (ret);
+}
+
+/*
+ * PUBLIC: void __repmgr_net_destroy __P((DB_ENV *, DB_REP *));
+ */
+void
+__repmgr_net_destroy(dbenv, db_rep)
+ DB_ENV *dbenv;
+ DB_REP *db_rep;
+{
+ REPMGR_CONNECTION *conn;
+ REPMGR_RETRY *retry;
+ REPMGR_SITE *site;
+ u_int i;
+
+ if (db_rep->sites == NULL)
+ return;
+
+ /*
+ * TODO: I think maybe these, and especially connections, should be
+ * cleaned up in close(), 'cuz there's more than just memory there.
+ * Does it matter, I wonder?
+ */
+ while (!TAILQ_EMPTY(&db_rep->retries)) {
+ retry = TAILQ_FIRST(&db_rep->retries);
+ TAILQ_REMOVE(&db_rep->retries, retry, entries);
+ __os_free(dbenv, retry);
+ }
+
+ while (!TAILQ_EMPTY(&db_rep->connections)) {
+ conn = TAILQ_FIRST(&db_rep->connections);
+ __repmgr_cleanup_connection(dbenv, conn);
+ }
+
+ for (i = 0; i < db_rep->site_cnt; i++) {
+ site = &db_rep->sites[i];
+ __repmgr_cleanup_netaddr(dbenv, &site->net_addr);
+ }
+ __os_free(dbenv, db_rep->sites);
+ db_rep->sites = NULL;
+}
diff --git a/db/repmgr/repmgr_posix.c b/db/repmgr/repmgr_posix.c
new file mode 100644
index 000000000..1a04fef25
--- /dev/null
+++ b/db/repmgr/repmgr_posix.c
@@ -0,0 +1,714 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005-2006
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_posix.c,v 1.22 2006/09/11 15:15:20 bostic Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#define __INCLUDE_SELECT_H 1
+#include "db_int.h"
+
+/*
+ * A very rough guess at the maximum stack space one of our threads could ever
+ * need, which we hope is plenty conservative. This can be patched in the field
+ * if necessary.
+ */
+#ifdef _POSIX_THREAD_ATTR_STACKSIZE
+size_t __repmgr_guesstimated_max = (128 * 1024);
+#endif
+
+static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
+
+/*
+ * Starts the thread described in the argument, and stores the resulting thread
+ * ID therein.
+ *
+ * PUBLIC: int __repmgr_thread_start __P((DB_ENV *, REPMGR_RUNNABLE *));
+ */
+int
+__repmgr_thread_start(dbenv, runnable)
+ DB_ENV *dbenv;
+ REPMGR_RUNNABLE *runnable;
+{
+ pthread_attr_t *attrp;
+#ifdef _POSIX_THREAD_ATTR_STACKSIZE
+ pthread_attr_t attributes;
+ size_t size;
+ int ret;
+#endif
+
+ runnable->finished = FALSE;
+
+#ifdef _POSIX_THREAD_ATTR_STACKSIZE
+ attrp = &attributes;
+ if ((ret = pthread_attr_init(&attributes)) != 0) {
+ __db_err(dbenv,
+ ret, "pthread_attr_init in repmgr_thread_start");
+ return (ret);
+ }
+
+ /*
+ * On a 64-bit machine it seems reasonable that we could need twice as
+ * much stack space as we did on a 32-bit machine.
+ */
+ size = __repmgr_guesstimated_max;
+ if (sizeof(size_t) > 4)
+ size *= 2;
+#ifdef PTHREAD_STACK_MIN
+ if (size < PTHREAD_STACK_MIN)
+ size = PTHREAD_STACK_MIN;
+#endif
+ if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) {
+ __db_err(dbenv,
+ ret, "pthread_attr_setstacksize in repmgr_thread_start");
+ return (ret);
+ }
+#else
+ attrp = NULL;
+#endif
+
+ return (pthread_create(&runnable->thread_id, attrp,
+ runnable->run, dbenv));
+}
+
+/*
+ * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *));
+ */
+int
+__repmgr_thread_join(thread)
+ REPMGR_RUNNABLE *thread;
+{
+ return (pthread_join(thread->thread_id, NULL));
+}
+
+/*
+ * PUBLIC: int __repmgr_set_nonblocking __P((socket_t));
+ */
+int __repmgr_set_nonblocking(fd)
+ socket_t fd;
+{
+ int flags;
+
+ if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
+ return (errno);
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
+ return (errno);
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_wake_waiting_senders __P((DB_ENV *));
+ *
+ * Wake any send()-ing threads waiting for an acknowledgement.
+ *
+ * !!!
+ * Caller must hold the db_rep->mutex, if this thread synchronization is to work
+ * properly.
+ */
+int
+__repmgr_wake_waiting_senders(dbenv)
+ DB_ENV *dbenv;
+{
+ return (pthread_cond_broadcast(&dbenv->rep_handle->ack_condition));
+}
+
+/*
+ * PUBLIC: int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
+ *
+ * Waits (a limited time) for configured number of remote sites to ack the given
+ * LSN.
+ *
+ * !!!
+ * Caller must hold repmgr->mutex (TODO: although that seems a shame, with all
+ * that deadline calculation).
+ */
+int
+__repmgr_await_ack(dbenv, lsnp)
+ DB_ENV *dbenv;
+ const DB_LSN *lsnp;
+{
+ DB_REP *db_rep;
+ struct timespec deadline;
+ int ret, timed;
+
+ db_rep = dbenv->rep_handle;
+
+ if ((timed = (db_rep->ack_timeout > 0)))
+ __repmgr_compute_wait_deadline(dbenv, &deadline,
+ db_rep->ack_timeout);
+ else
+ COMPQUIET(deadline.tv_sec, 0);
+
+ while (!__repmgr_is_permanent(dbenv, lsnp)) {
+ if (timed)
+ ret = pthread_cond_timedwait(&db_rep->ack_condition,
+ &db_rep->mutex, &deadline);
+ else
+ ret = pthread_cond_wait(&db_rep->ack_condition,
+ &db_rep->mutex);
+ if (db_rep->finished)
+ return (DB_REP_UNAVAIL);
+ if (ret != 0)
+ return (ret); /* TODO: but first check if we need (to
+ * create) a panic */
+ }
+ return (0);
+}
+
+/*
+ * Computes a deadline time a certain distance into the future, in a form
+ * suitable for the pthreads timed wait operation. Curiously, that call uses
+ * nano-second resolution; elsewhere we use microseconds.
+ *
+ * PUBLIC: void __repmgr_compute_wait_deadline __P((DB_ENV*,
+ * PUBLIC: struct timespec *, db_timeout_t));
+ */
+void
+__repmgr_compute_wait_deadline(dbenv, result, wait)
+ DB_ENV *dbenv;
+ struct timespec *result;
+ db_timeout_t wait;
+{
+ u_int32_t secs, usecs;
+
+ /*
+ * Start with "now"; then add the "wait" offset.
+ */
+ __os_clock(dbenv, &secs, &usecs);
+
+ if (wait > 1000000) {
+ secs += wait / 1000000;
+ usecs += wait % 1000000;
+ } else
+ usecs += wait;
+
+ if (usecs > 1000000) {
+ secs++;
+ usecs -= 1000000;
+ }
+
+ result->tv_sec = (time_t)secs;
+ result->tv_nsec = (long)(usecs * 1000);
+}
+
+/*
+ * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
+ *
+ * Allocate/initialize all data necessary for thread synchronization. This
+ * should be an all-or-nothing affair. Other than here and in _close_sync there
+ * should never be a time when these resources aren't either all allocated or
+ * all freed. If that's true, then we can safely use the values of the file
+ * descriptor(s) to keep track of which it is.
+ */
+int
+__repmgr_init_sync(dbenv, db_rep)
+ DB_ENV *dbenv;
+ DB_REP *db_rep;
+{
+ int ret, mutex_inited, ack_inited, elect_inited, queue_inited,
+ file_desc[2];
+
+ COMPQUIET(dbenv, NULL);
+
+ mutex_inited = ack_inited = elect_inited = queue_inited = FALSE;
+
+ if ((ret = pthread_mutex_init(&db_rep->mutex, NULL)) != 0)
+ goto err;
+ mutex_inited = TRUE;
+
+ if ((ret = pthread_cond_init(&db_rep->ack_condition, NULL)) != 0)
+ goto err;
+ ack_inited = TRUE;
+
+ if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0)
+ goto err;
+ elect_inited = TRUE;
+
+ if ((ret = pthread_cond_init(&db_rep->queue_nonempty, NULL)) != 0)
+ goto err;
+ queue_inited = TRUE;
+
+ if ((ret = pipe(file_desc)) == -1) {
+ ret = errno;
+ goto err;
+ }
+
+ db_rep->read_pipe = file_desc[0];
+ db_rep->write_pipe = file_desc[1];
+ return (0);
+err:
+ if (queue_inited)
+ (void)pthread_cond_destroy(&db_rep->queue_nonempty);
+ if (elect_inited)
+ (void)pthread_cond_destroy(&db_rep->check_election);
+ if (ack_inited)
+ (void)pthread_cond_destroy(&db_rep->ack_condition);
+ if (mutex_inited)
+ (void)pthread_mutex_destroy(&db_rep->mutex);
+ db_rep->read_pipe = db_rep->write_pipe = -1;
+
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_close_sync __P((DB_ENV *));
+ *
+ * Frees the thread synchronization data within a repmgr struct, in a
+ * platform-specific way.
+ */
+int
+__repmgr_close_sync(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ int ret, t_ret;
+
+ db_rep = dbenv->rep_handle;
+
+ if (!(REPMGR_SYNC_INITED(db_rep)))
+ return (0);
+
+ ret = pthread_cond_destroy(&db_rep->queue_nonempty);
+
+ if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 &&
+ ret == 0)
+ ret = t_ret;
+
+ if ((t_ret = pthread_cond_destroy(&db_rep->ack_condition)) != 0 &&
+ ret == 0)
+ ret = t_ret;
+
+ if ((t_ret = pthread_mutex_destroy(&db_rep->mutex)) != 0 &&
+ ret == 0)
+ ret = t_ret;
+
+ if (close(db_rep->read_pipe) == -1 && ret == 0)
+ ret = errno;
+ if (close(db_rep->write_pipe) == -1 && ret == 0)
+ ret = errno;
+
+ db_rep->read_pipe = db_rep->write_pipe = -1;
+ return (ret);
+}
+
+/*
+ * Performs net-related resource initialization other than memory initialization
+ * and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing"
+ * sentinel signifying that these resources are allocated.
+ *
+ * PUBLIC: int __repmgr_net_init __P((DB_ENV *, DB_REP *));
+ */
+int
+__repmgr_net_init(dbenv, db_rep)
+ DB_ENV *dbenv;
+ DB_REP *db_rep;
+{
+ int ret;
+ struct sigaction sigact;
+
+ if ((ret = __repmgr_listen(dbenv)) != 0)
+ return (ret);
+
+ /*
+ * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed
+ * just for trying to write onto a socket that had been reset.
+ */
+ if (sigaction(SIGPIPE, NULL, &sigact) == -1) {
+ ret = errno;
+ __db_err(dbenv, ret, "can't access signal handler");
+ goto err;
+ }
+ /*
+ * If we need to change the sig handler, do so, and also set a flag so
+ * that we remember we did.
+ */
+ if ((db_rep->chg_sig_handler = (sigact.sa_handler == SIG_DFL))) {
+ sigact.sa_handler = SIG_IGN;
+ sigact.sa_flags = 0;
+ if (sigaction(SIGPIPE, &sigact, NULL) == -1) {
+ ret = errno;
+ __db_err(dbenv, ret, "can't access signal handler");
+ goto err;
+ }
+ }
+ return (0);
+
+err:
+ (void)closesocket(db_rep->listen_fd);
+ db_rep->listen_fd = INVALID_SOCKET;
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *));
+ */
+int
+__repmgr_lock_mutex(mutex)
+ mgr_mutex_t *mutex;
+{
+ return (pthread_mutex_lock(mutex));
+}
+
+/*
+ * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *));
+ */
+int
+__repmgr_unlock_mutex(mutex)
+ mgr_mutex_t *mutex;
+{
+ return (pthread_mutex_unlock(mutex));
+}
+
+/*
+ * Signals a condition variable.
+ *
+ * !!!
+ * Caller must hold mutex.
+ *
+ * PUBLIC: int __repmgr_signal __P((cond_var_t *));
+ */
+int
+__repmgr_signal(v)
+ cond_var_t *v;
+{
+ return (pthread_cond_broadcast(v));
+}
+
+/*
+ * PUBLIC: int __repmgr_wake_main_thread __P((DB_ENV*));
+ */
+int
+__repmgr_wake_main_thread(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ u_int8_t any_value;
+
+ COMPQUIET(any_value, 0);
+ db_rep = dbenv->rep_handle;
+
+ /*
+ * It doesn't matter what byte value we write. Just the appearance of a
+ * byte in the stream is enough to wake up the select() thread reading
+ * the pipe.
+ */
+ if (write(db_rep->write_pipe, &any_value, 1) == -1)
+ return (errno);
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *));
+ */
+int
+__repmgr_writev(fd, iovec, buf_count, byte_count_p)
+ socket_t fd;
+ db_iovec_t *iovec;
+ int buf_count;
+ size_t *byte_count_p;
+{
+ int nw;
+
+ if ((nw = writev(fd, iovec, buf_count)) == -1)
+ return (errno);
+ *byte_count_p = (size_t)nw;
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *));
+ */
+int
+__repmgr_readv(fd, iovec, buf_count, byte_count_p)
+ socket_t fd;
+ db_iovec_t *iovec;
+ int buf_count;
+ size_t *byte_count_p;
+{
+ ssize_t nw;
+
+ if ((nw = readv(fd, iovec, buf_count)) == -1)
+ return (errno);
+ *byte_count_p = (size_t)nw;
+ return (0);
+}
+
+/*
+ * Calculate the time duration from now til "when", in the form of a struct
+ * timeval (suitable for select()), clipping the result at 0 (i.e., avoid a
+ * negative result).
+ *
+ * PUBLIC: void __repmgr_timeval_diff_current
+ * PUBLIC: __P((DB_ENV *, repmgr_timeval_t *, select_timeout_t *));
+ */
+void
+__repmgr_timeval_diff_current(dbenv, when, result)
+ DB_ENV *dbenv;
+ repmgr_timeval_t *when;
+ select_timeout_t *result;
+{
+ repmgr_timeval_t now;
+
+ __os_clock(dbenv, &now.tv_sec, &now.tv_usec);
+ if (__repmgr_timeval_cmp(when, &now) <= 0)
+ result->tv_sec = result->tv_usec = 0;
+ else {
+ /*
+ * Do the arithmetic; first see if we need to "borrow".
+ */
+ if (when->tv_usec < now.tv_usec) {
+ when->tv_usec += 1000000;
+ when->tv_sec--;
+ }
+ result->tv_usec = (long)(when->tv_usec - now.tv_usec);
+ result->tv_sec = (time_t)(when->tv_sec - now.tv_sec);
+ }
+}
+
+/*
+ * PUBLIC: int __repmgr_select_loop __P((DB_ENV *));
+ */
+int
+__repmgr_select_loop(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn, *next;
+ REPMGR_RETRY *retry;
+ select_timeout_t timeout, *timeout_p;
+ fd_set reads, writes;
+ int ret, flow_control, maxfd, nready;
+ u_int8_t buf[10]; /* arbitrary size */
+
+ /* TODO: turn this on when the input queue gets too big. */
+ flow_control = FALSE;
+
+ db_rep = dbenv->rep_handle;
+ /*
+ * Almost this entire thread operates while holding the mutex. But note
+ * that it never blocks, except in the call to select() (which is the
+ * one place we relinquish the mutex).
+ */
+ LOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_first_try_connections(dbenv)) != 0)
+ goto out;
+ for (;;) {
+ FD_ZERO(&reads);
+ FD_ZERO(&writes);
+
+ /*
+ * Always ask for input on listening socket and signalling
+ * pipe.
+ */
+ FD_SET((u_int)db_rep->listen_fd, &reads);
+ maxfd = db_rep->listen_fd;
+
+ FD_SET((u_int)db_rep->read_pipe, &reads);
+ if (db_rep->read_pipe > maxfd)
+ maxfd = db_rep->read_pipe;
+
+ /*
+ * Examine all connections to see what sort of I/O to ask for on
+ * each one. The TAILQ_FOREACH macro would be suitable here,
+ * except that it doesn't allow unlinking the current element,
+ * which is needed for cleanup_connection.
+ */
+ for (conn = TAILQ_FIRST(&db_rep->connections);
+ conn != NULL;
+ conn = next) {
+ next = TAILQ_NEXT(conn, entries);
+ if (F_ISSET(conn, CONN_DEFUNCT)) {
+ __repmgr_cleanup_connection(dbenv, conn);
+ continue;
+ }
+
+ if (F_ISSET(conn, CONN_CONNECTING)) {
+ FD_SET((u_int)conn->fd, &reads);
+ FD_SET((u_int)conn->fd, &writes);
+ if (conn->fd > maxfd)
+ maxfd = conn->fd;
+ continue;
+ }
+
+ if (!STAILQ_EMPTY(&conn->outbound_queue)) {
+ FD_SET((u_int)conn->fd, &writes);
+ if (conn->fd > maxfd)
+ maxfd = conn->fd;
+ }
+ /*
+ * If we haven't yet gotten site's handshake, then read
+ * from it even if we're flow-controlling.
+ */
+ if (!flow_control || !IS_VALID_EID(conn->eid)) {
+ FD_SET((u_int)conn->fd, &reads);
+ if (conn->fd > maxfd)
+ maxfd = conn->fd;
+ }
+ }
+ /*
+ * Decide how long to wait based on when it will next be time to
+ * retry an idle connection. (List items are in order, so we
+ * only have to examine the first one.)
+ */
+ if (TAILQ_EMPTY(&db_rep->retries))
+ timeout_p = NULL;
+ else {
+ retry = TAILQ_FIRST(&db_rep->retries);
+
+ timeout_p = &timeout;
+ __repmgr_timeval_diff_current(
+ dbenv, &retry->time, timeout_p);
+ }
+
+ UNLOCK_MUTEX(db_rep->mutex);
+
+ if ((ret = select(maxfd + 1, &reads, &writes, NULL, timeout_p))
+ == -1) {
+ switch (ret = errno) {
+ case EINTR:
+ case EWOULDBLOCK:
+ LOCK_MUTEX(db_rep->mutex);
+ continue; /* simply retry */
+ default:
+ __db_err(dbenv, ret, "select");
+ return (ret);
+ }
+ }
+ nready = ret;
+
+ LOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_retry_connections(dbenv)) != 0)
+ goto out;
+ if (nready == 0)
+ continue;
+
+ /*
+ * Traverse the linked list. Almost like TAILQ_FOREACH, except
+ * that we need the ability to unlink an element along the way.
+ */
+ for (conn = TAILQ_FIRST(&db_rep->connections);
+ conn != NULL;
+ conn = next) {
+ next = TAILQ_NEXT(conn, entries);
+ if (F_ISSET(conn, CONN_CONNECTING)) {
+ if (FD_ISSET((u_int)conn->fd, &reads) ||
+ FD_ISSET((u_int)conn->fd, &writes)) {
+ if ((ret = finish_connecting(dbenv,
+ conn)) == DB_REP_UNAVAIL) {
+ if ((ret =
+ __repmgr_bust_connection(
+ dbenv, conn, TRUE)) != 0)
+ goto out;
+ } else if (ret != 0)
+ goto out;
+ }
+ continue;
+ }
+
+ /*
+ * Here, the site is connected, and the FD_SET's are
+ * valid.
+ */
+ if (FD_ISSET((u_int)conn->fd, &writes)) {
+ if ((ret = __repmgr_write_some(
+ dbenv, conn)) == DB_REP_UNAVAIL) {
+ if ((ret =
+ __repmgr_bust_connection(dbenv,
+ conn, TRUE)) != 0)
+ goto out;
+ continue;
+ } else if (ret != 0)
+ goto out;
+ }
+
+ if (!flow_control &&
+ FD_ISSET((u_int)conn->fd, &reads)) {
+ if ((ret = __repmgr_read_from_site(dbenv, conn))
+ == DB_REP_UNAVAIL) {
+ if ((ret =
+ __repmgr_bust_connection(dbenv,
+ conn, TRUE)) != 0)
+ goto out;
+ continue;
+ } else if (ret != 0)
+ goto out;
+ }
+ }
+
+ /*
+ * Read any bytes in the signalling pipe. Note that we don't
+ * actually need to do anything with them; they're just there to
+ * wake us up when necessary.
+ */
+ if (FD_ISSET((u_int)db_rep->read_pipe, &reads)) {
+ if (read(db_rep->read_pipe, buf, sizeof(buf)) <= 0) {
+ ret = errno;
+ goto out;
+ } else if (db_rep->finished) {
+ ret = 0;
+ goto out;
+ }
+ }
+ if (FD_ISSET((u_int)db_rep->listen_fd, &reads) &&
+ (ret = __repmgr_accept(dbenv)) != 0)
+ goto out;
+ }
+out:
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
+}
+
+static int
+finish_connecting(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+ socklen_t len;
+ SITE_STRING_BUFFER buffer;
+ u_int eid;
+ int error, ret;
+
+ len = sizeof(error);
+ if (getsockopt(
+ conn->fd, SOL_SOCKET, SO_ERROR, (sockopt_t)&error, &len) < 0)
+ goto err_rpt;
+ if (error) {
+ errno = error;
+ goto err_rpt;
+ }
+
+ F_CLR(conn, CONN_CONNECTING);
+ return (__repmgr_send_handshake(dbenv, conn));
+
+err_rpt:
+ db_rep = dbenv->rep_handle;
+
+ DB_ASSERT(dbenv, IS_VALID_EID(conn->eid));
+ eid = (u_int)conn->eid;
+
+ site = SITE_FROM_EID(eid);
+ __db_err(dbenv, errno,
+ "connecting to %s", __repmgr_format_site_loc(site, buffer));
+
+ /* If we've exhausted the list of possible addresses, give up. */
+ if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
+ return (DB_REP_UNAVAIL);
+
+ /*
+ * This is just like a little mini-"bust_connection", except that we
+ * don't reschedule for later, 'cuz we're just about to try again right
+ * now.
+ */
+ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+ __repmgr_cleanup_connection(dbenv, conn);
+ ret = __repmgr_connect_site(dbenv, eid);
+ DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+ return (ret);
+}
diff --git a/db/repmgr/repmgr_queue.c b/db/repmgr/repmgr_queue.c
new file mode 100644
index 000000000..be9adc2e7
--- /dev/null
+++ b/db/repmgr/repmgr_queue.c
@@ -0,0 +1,158 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2006
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_queue.c,v 1.6 2006/08/24 14:46:26 bostic Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#include "db_int.h"
+
+typedef STAILQ_HEAD(__repmgr_q_header, __repmgr_message) QUEUE_HEADER;
+struct __repmgr_queue {
+ int size;
+ QUEUE_HEADER header;
+};
+
+/*
+ * PUBLIC: int __repmgr_queue_create __P((DB_ENV *, DB_REP *));
+ */
+int
+__repmgr_queue_create(dbenv, db_rep)
+ DB_ENV *dbenv;
+ DB_REP *db_rep;
+{
+ REPMGR_QUEUE *q;
+ int ret;
+
+ COMPQUIET(dbenv, NULL);
+
+ if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_QUEUE), &q)) != 0)
+ return (ret);
+ q->size = 0;
+ STAILQ_INIT(&q->header);
+ db_rep->input_queue = q;
+ return (0);
+}
+
+/*
+ * Frees not only the queue header, but also any messages that may be on it,
+ * along with their data buffers.
+ *
+ * PUBLIC: void __repmgr_queue_destroy __P((DB_ENV *));
+ */
+void
+__repmgr_queue_destroy(dbenv)
+ DB_ENV *dbenv;
+{
+ REPMGR_QUEUE *q;
+ REPMGR_MESSAGE *m;
+
+ if ((q = dbenv->rep_handle->input_queue) == NULL)
+ return;
+
+ while (!STAILQ_EMPTY(&q->header)) {
+ m = STAILQ_FIRST(&q->header);
+ STAILQ_REMOVE_HEAD(&q->header, entries);
+ __os_free(dbenv, m);
+ }
+ __os_free(dbenv, q);
+}
+
+/*
+ * PUBLIC: int __repmgr_queue_get __P((DB_ENV *, REPMGR_MESSAGE **));
+ *
+ * Get the first input message from the queue and return it to the caller. The
+ * caller hereby takes responsibility for the entire message buffer, and should
+ * free it when done.
+ *
+ * Note that caller is NOT expected to hold the mutex. This is asymmetric with
+ * put(), because put() is expected to be called in a loop after select, where
+ * it's already necessary to be holding the mutex.
+ */
+int
+__repmgr_queue_get(dbenv, msgp)
+ DB_ENV *dbenv;
+ REPMGR_MESSAGE **msgp;
+{
+ DB_REP *db_rep;
+ REPMGR_QUEUE *q;
+ REPMGR_MESSAGE *m;
+ int ret;
+
+ ret = 0;
+ db_rep = dbenv->rep_handle;
+ q = db_rep->input_queue;
+
+ LOCK_MUTEX(db_rep->mutex);
+ while (STAILQ_EMPTY(&q->header) && !db_rep->finished) {
+#ifdef DB_WIN32
+ if (!ResetEvent(db_rep->queue_nonempty)) {
+ ret = GetLastError();
+ goto err;
+ }
+ if (SignalObjectAndWait(db_rep->mutex, db_rep->queue_nonempty,
+ INFINITE, FALSE) != WAIT_OBJECT_0) {
+ ret = GetLastError();
+ goto err;
+ }
+ LOCK_MUTEX(db_rep->mutex);
+#else
+ if ((ret = pthread_cond_wait(&db_rep->queue_nonempty,
+ &db_rep->mutex)) != 0)
+ goto err;
+#endif
+ }
+ if (db_rep->finished)
+ ret = DB_REP_UNAVAIL;
+ else {
+ m = STAILQ_FIRST(&q->header);
+ STAILQ_REMOVE_HEAD(&q->header, entries);
+ q->size--;
+ *msgp = m;
+ }
+
+err:
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_queue_put __P((DB_ENV *, REPMGR_MESSAGE *));
+ *
+ * !!!
+ * Caller must hold repmgr->mutex.
+ */
+int
+__repmgr_queue_put(dbenv, msg)
+ DB_ENV *dbenv;
+ REPMGR_MESSAGE *msg;
+{
+ DB_REP *db_rep;
+ REPMGR_QUEUE *q;
+
+ db_rep = dbenv->rep_handle;
+ q = db_rep->input_queue;
+
+ STAILQ_INSERT_TAIL(&q->header, msg, entries);
+ q->size++;
+
+ return (__repmgr_signal(&db_rep->queue_nonempty));
+}
+
+/*
+ * PUBLIC: int __repmgr_queue_size __P((DB_ENV *));
+ *
+ * !!!
+ * Caller must hold repmgr->mutex.
+ */
+int
+__repmgr_queue_size(dbenv)
+ DB_ENV *dbenv;
+{
+ return (dbenv->rep_handle->input_queue->size);
+}
diff --git a/db/repmgr/repmgr_sel.c b/db/repmgr/repmgr_sel.c
new file mode 100644
index 000000000..bbb29b0ee
--- /dev/null
+++ b/db/repmgr/repmgr_sel.c
@@ -0,0 +1,875 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_sel.c,v 1.25 2006/09/19 14:14:11 mjc Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#include "db_int.h"
+
+static int __repmgr_connect __P((DB_ENV*, socket_t *, REPMGR_SITE *));
+static int record_ack __P((DB_ENV *, REPMGR_SITE *, DB_REPMGR_ACK *));
+static int dispatch_phase_completion __P((DB_ENV *, REPMGR_CONNECTION *));
+static int notify_handshake __P((DB_ENV *, REPMGR_CONNECTION *));
+
+/*
+ * PUBLIC: void *__repmgr_select_thread __P((void *));
+ */
+void *
+__repmgr_select_thread(args)
+ void *args;
+{
+ DB_ENV *dbenv = args;
+ int ret;
+
+ if ((ret = __repmgr_select_loop(dbenv)) != 0) {
+ __db_err(dbenv, ret, "select loop failed");
+ __repmgr_thread_failure(dbenv, ret);
+ }
+ return (NULL);
+}
+
+/*
+ * PUBLIC: int __repmgr_accept __P((DB_ENV *));
+ */
+int
+__repmgr_accept(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ struct sockaddr_in siaddr;
+ socklen_t addrlen;
+ socket_t s;
+ int ret;
+#ifdef DB_WIN32
+ WSAEVENT event_obj;
+#endif
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ db_rep = dbenv->rep_handle;
+ addrlen = sizeof(siaddr);
+ if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
+ &addrlen)) == -1) {
+ /*
+ * Some errors are innocuous and so should be ignored. MSDN
+ * Library documents the Windows ones; the Unix ones are
+ * advocated in Stevens' UNPv1, section 16.6; and Linux
+ * Application Development, p. 416.
+ */
+ switch (ret = net_errno) {
+#ifdef DB_WIN32
+ case WSAECONNRESET:
+ case WSAEWOULDBLOCK:
+#else
+ case EINTR:
+ case EWOULDBLOCK:
+ case ECONNABORTED:
+ case ENETDOWN:
+#ifdef EPROTO
+ case EPROTO:
+#endif
+ case ENOPROTOOPT:
+ case EHOSTDOWN:
+#ifdef ENONET
+ case ENONET:
+#endif
+ case EHOSTUNREACH:
+ case EOPNOTSUPP:
+ case ENETUNREACH:
+#endif
+ RPRINT(dbenv, (dbenv, &mb,
+ "accept error %d considered innocuous", ret));
+ return (0);
+ default:
+ __db_err(dbenv, ret, "accept error");
+ return (ret);
+ }
+ }
+ RPRINT(dbenv, (dbenv, &mb, "accepted a new connection"));
+
+ if ((ret = __repmgr_set_nonblocking(s)) != 0) {
+ __db_err(dbenv, ret, "can't set nonblock after accept");
+ (void)closesocket(s);
+ return (ret);
+ }
+
+#ifdef DB_WIN32
+ if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
+ ret = net_errno;
+ __db_err(dbenv, ret, "can't create WSA event");
+ (void)closesocket(s);
+ return (ret);
+ }
+ if (WSAEventSelect(s, event_obj, FD_READ|FD_CLOSE) == SOCKET_ERROR) {
+ ret = net_errno;
+ __db_err(dbenv, ret, "can't set desired event bits");
+ (void)WSACloseEvent(event_obj);
+ (void)closesocket(s);
+ return (ret);
+ }
+#endif
+ if ((ret = __repmgr_new_connection(dbenv, &conn, s, 0)) != 0) {
+#ifdef DB_WIN32
+ (void)WSACloseEvent(event_obj);
+#endif
+ (void)closesocket(s);
+ return (ret);
+ }
+ conn->eid = -1;
+#ifdef DB_WIN32
+ conn->event_object = event_obj;
+#endif
+
+ switch (ret = __repmgr_send_handshake(dbenv, conn)) {
+ case 0:
+ return (0);
+ case DB_REP_UNAVAIL:
+ return (__repmgr_bust_connection(dbenv, conn, TRUE));
+ default:
+ return (ret);
+ }
+}
+
+/*
+ * Initiates connection attempts for any sites on the idle list whose retry
+ * times have expired.
+ *
+ * PUBLIC: int __repmgr_retry_connections __P((DB_ENV *));
+ *
+ * !!!
+ * Assumes caller holds the mutex.
+ */
+int
+__repmgr_retry_connections(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ REPMGR_RETRY *retry;
+ repmgr_netaddr_t *addr;
+ repmgr_timeval_t now;
+ ADDRINFO *list;
+ u_int eid;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+ __os_clock(dbenv, &now.tv_sec, &now.tv_usec);
+
+ while (!TAILQ_EMPTY(&db_rep->retries)) {
+ retry = TAILQ_FIRST(&db_rep->retries);
+ if (__repmgr_timeval_cmp(&retry->time, &now) > 0)
+ break; /* since items are in time order */
+
+ TAILQ_REMOVE(&db_rep->retries, retry, entries);
+
+ eid = retry->eid;
+ __os_free(dbenv, retry);
+
+ /*
+ * If have never yet successfully resolved this site's host
+ * name, try to do so now.
+ *
+ * (Throughout all the rest of repmgr, we almost never do any
+ * sort of blocking operation in the select thread. This is the
+ * sole exception to that rule. Fortunately, it should rarely
+ * happen. It only happens for a site that we only learned
+ * about because it connected to us: not only were we not
+ * configured to know about it, but we also never got a NEWSITE
+ * message about it. And even then only after the connection
+ * fails and we want to retry it from this end.)
+ */
+ addr = &SITE_FROM_EID(eid)->net_addr;
+ if (ADDR_LIST_FIRST(addr) == NULL) {
+ if (__repmgr_getaddr(dbenv,
+ addr->host, addr->port, 0, &list) == 0) {
+ addr->address_list = list;
+ (void)ADDR_LIST_FIRST(addr);
+ } else if ((ret = __repmgr_schedule_connection_attempt(
+ dbenv, eid, FALSE)) != 0)
+ return (ret);
+ else
+ continue;
+ }
+ if ((ret = __repmgr_connect_site(dbenv, eid)) != 0)
+ return (ret);
+ }
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_first_try_connections __P((DB_ENV *));
+ *
+ * !!!
+ * Assumes caller holds the mutex.
+ */
+int
+__repmgr_first_try_connections(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ u_int eid;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+ for (eid=0; eid<db_rep->site_cnt; eid++) {
+ ADDR_LIST_FIRST(&SITE_FROM_EID(eid)->net_addr);
+ if ((ret = __repmgr_connect_site(dbenv, eid)) != 0)
+ return (ret);
+ }
+ return (0);
+}
+
+/*
+ * Tries to establish a connection with the site indicated by the given eid,
+ * starting with the "current" element of its address list and trying as many
+ * addresses as necessary until the list is exhausted.
+ *
+ * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
+ */
+int
+__repmgr_connect_site(dbenv, eid)
+ DB_ENV *dbenv;
+ u_int eid;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+ REPMGR_CONNECTION *con;
+ socket_t s;
+ u_int32_t flags;
+ int ret;
+#ifdef DB_WIN32
+ long desired_event;
+ WSAEVENT event_obj;
+#endif
+
+ db_rep = dbenv->rep_handle;
+ site = SITE_FROM_EID(eid);
+
+ flags = 0;
+ switch (ret = __repmgr_connect(dbenv, &s, site)) {
+ case 0:
+ flags = 0;
+#ifdef DB_WIN32
+ desired_event = FD_READ|FD_CLOSE;
+#endif
+ break;
+ case INPROGRESS:
+ flags = CONN_CONNECTING;
+#ifdef DB_WIN32
+ desired_event = FD_CONNECT;
+#endif
+ break;
+ default:
+ return (
+ __repmgr_schedule_connection_attempt(dbenv, eid, FALSE));
+ }
+
+#ifdef DB_WIN32
+ if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
+ ret = net_errno;
+ __db_err(dbenv, ret, "can't create WSA event");
+ (void)closesocket(s);
+ return (ret);
+ }
+ if (WSAEventSelect(s, event_obj, desired_event) == SOCKET_ERROR) {
+ ret = net_errno;
+ __db_err(dbenv, ret, "can't set desired event bits");
+ (void)WSACloseEvent(event_obj);
+ (void)closesocket(s);
+ return (ret);
+ }
+#endif
+
+ if ((ret = __repmgr_new_connection(dbenv, &con, s, flags))
+ != 0) {
+#ifdef DB_WIN32
+ (void)WSACloseEvent(event_obj);
+#endif
+ (void)closesocket(s);
+ return (ret);
+ }
+#ifdef DB_WIN32
+ con->event_object = event_obj;
+#endif
+
+ if (flags == 0) {
+ switch (ret = __repmgr_send_handshake(dbenv, con)) {
+ case 0:
+ break;
+ case DB_REP_UNAVAIL:
+ return (__repmgr_bust_connection(dbenv, con, TRUE));
+ default:
+ return (ret);
+ }
+ }
+
+ con->eid = (int)eid;
+
+ site->ref.conn = con;
+ site->state = SITE_CONNECTED;
+ return (0);
+}
+
+static int
+__repmgr_connect(dbenv, socket_result, site)
+ DB_ENV *dbenv;
+ socket_t *socket_result;
+ REPMGR_SITE *site;
+{
+ repmgr_netaddr_t *addr;
+ ADDRINFO *ai;
+ socket_t s;
+ char *why;
+ int ret;
+ SITE_STRING_BUFFER buffer;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ /*
+ * Lint doesn't know about DB_ASSERT, so it can't tell that this
+ * loop will always get executed at least once, giving 'why' a value.
+ */
+ COMPQUIET(why, "");
+ addr = &site->net_addr;
+ ai = ADDR_LIST_CURRENT(addr);
+ DB_ASSERT(dbenv, ai != NULL);
+ for (; ai != NULL; ai = ADDR_LIST_NEXT(addr)) {
+
+ if ((s = socket(ai->ai_family,
+ ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) {
+ why = "can't create socket to connect";
+ continue;
+ }
+
+ if ((ret = __repmgr_set_nonblocking(s)) != 0) {
+ __db_err(dbenv,
+ ret, "can't make nonblock socket to connect");
+ (void)closesocket(s);
+ return (ret);
+ }
+
+ if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0)
+ ret = net_errno;
+
+ if (ret == 0 || ret == INPROGRESS) {
+ *socket_result = s;
+ RPRINT(dbenv, (dbenv, &mb,
+ "init connection to %s with result %d",
+ __repmgr_format_site_loc(site, buffer), ret));
+ return (ret);
+ }
+
+ why = "connection failed";
+ (void)closesocket(s);
+ }
+
+ /* We've exhausted all possible addresses. */
+ ret = net_errno;
+ __db_err(dbenv, ret, "%s to %s", why,
+ __repmgr_format_site_loc(site, buffer));
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_send_handshake __P((DB_ENV *, REPMGR_CONNECTION *));
+ */
+int
+__repmgr_send_handshake(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ repmgr_netaddr_t *my_addr;
+ DB_REPMGR_HANDSHAKE buffer;
+ DBT cntrl, rec;
+
+ db_rep = dbenv->rep_handle;
+ rep = db_rep->region;
+ my_addr = &db_rep->my_addr;
+
+ /*
+ * Remind us that we need to fix priority on the rep API not to be an
+ * int, if we're going to pass it across the wire.
+ */
+ DB_ASSERT(dbenv, sizeof(u_int32_t) >= sizeof(int));
+
+ buffer.version = DB_REPMGR_VERSION;
+ /* TODO: using network byte order is pointless if we only do it here. */
+ buffer.priority = htonl((u_int32_t)rep->priority);
+ buffer.port = my_addr->port;
+ cntrl.data = &buffer;
+ cntrl.size = sizeof(buffer);
+
+ DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
+
+ return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
+}
+
+/*
+ * PUBLIC: int __repmgr_read_from_site __P((DB_ENV *, REPMGR_CONNECTION *));
+ *
+ * !!!
+ * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here.
+ */
+int
+__repmgr_read_from_site(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+{
+ SITE_STRING_BUFFER buffer;
+ size_t nr;
+ int ret;
+
+ /*
+ * Keep reading pieces as long as we're making some progress, or until
+ * we complete the current read phase.
+ */
+ for (;;) {
+ if ((ret = __repmgr_readv(conn->fd,
+ &conn->iovecs.vectors[conn->iovecs.offset],
+ conn->iovecs.count - conn->iovecs.offset, &nr)) != 0) {
+ switch (ret) {
+#ifndef DB_WIN32
+ case EINTR:
+ continue;
+#endif
+ case WOULDBLOCK:
+ return (0);
+ default:
+ (void)__repmgr_format_eid_loc(dbenv->rep_handle,
+ conn->eid, buffer);
+ __db_err(dbenv, ret,
+ "can't read from %s", buffer);
+ return (DB_REP_UNAVAIL);
+ }
+ }
+
+ if (nr > 0) {
+ if (__repmgr_update_consumed(&conn->iovecs, nr))
+ return (dispatch_phase_completion(dbenv,
+ conn));
+ } else {
+ (void)__repmgr_format_eid_loc(dbenv->rep_handle,
+ conn->eid, buffer);
+ __db_errx(dbenv, "EOF on connection from %s", buffer);
+ return (DB_REP_UNAVAIL);
+ }
+ }
+}
+
+/*
+ * Handles whatever needs to be done upon the completion of a reading phase on a
+ * given connection.
+ */
+static int
+dispatch_phase_completion(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+{
+#define MEM_ALIGN sizeof(double)
+ DB_REP *db_rep;
+ DB_REPMGR_ACK *ack;
+ REPMGR_SITE *site;
+ DB_REPMGR_HANDSHAKE *handshake;
+ REPMGR_RETRY *retry;
+ repmgr_netaddr_t addr;
+ DBT *dbt;
+ u_int32_t control_size, rec_size;
+ size_t memsize, control_offset, rec_offset;
+ void *membase;
+ char *host;
+ u_int port;
+ int ret, eid;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ db_rep = dbenv->rep_handle;
+ switch (conn->reading_phase) {
+ case SIZES_PHASE:
+ /*
+ * We've received the header: a message type and the lengths of
+ * the two pieces of the message. Set up buffers to read the
+ * two pieces.
+ */
+
+ if (conn->msg_type != REPMGR_HANDSHAKE &&
+ !IS_VALID_EID(conn->eid)) {
+ __db_errx(dbenv,
+ "expected handshake as first msg from passively connected site");
+ return (DB_REP_UNAVAIL);
+ }
+
+ __repmgr_iovec_init(&conn->iovecs);
+ control_size = ntohl(conn->control_size_buf);
+ rec_size = ntohl(conn->rec_size_buf);
+ if (conn->msg_type == REPMGR_REP_MESSAGE) {
+ /*
+ * Allocate a block of memory large enough to hold a
+ * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT
+ * data areas that it points to. Start by calculating
+ * the total memory needed, rounding up for the start of
+ * each DBT, to ensure possible alignment requirements.
+ */
+ /*
+ * TODO: Keith says we don't need to mess with this: put
+ * the burden on base replication code.
+ */
+ memsize = (size_t)
+ DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
+ control_offset = memsize;
+ memsize += control_size;
+ if (rec_size > 0) {
+ memsize = (size_t)DB_ALIGN(memsize, MEM_ALIGN);
+ rec_offset = memsize;
+ memsize += rec_size;
+ } else
+ COMPQUIET(rec_offset, 0);
+ if ((ret = __os_malloc(dbenv, memsize, &membase)) != 0)
+ return (ret);
+ conn->input.rep_message = membase;
+ memset(&conn->input.rep_message->control, 0,
+ sizeof(DBT));
+ memset(&conn->input.rep_message->rec, 0, sizeof(DBT));
+ conn->input.rep_message->originating_eid = conn->eid;
+ conn->input.rep_message->control.size = control_size;
+ conn->input.rep_message->control.data =
+ (u_int8_t*)membase + control_offset;
+ __repmgr_add_buffer(&conn->iovecs,
+ conn->input.rep_message->control.data,
+ control_size);
+
+ conn->input.rep_message->rec.size = rec_size;
+ if (rec_size > 0) {
+ conn->input.rep_message->rec.data =
+ (u_int8_t*)membase + rec_offset;
+ __repmgr_add_buffer(&conn->iovecs,
+ conn->input.rep_message->rec.data,
+ rec_size);
+ } else
+ conn->input.rep_message->rec.data = NULL;
+
+ } else {
+ if (control_size == 0) {
+ __db_errx(
+ dbenv, "illegal size for non-rep msg");
+ return (DB_REP_UNAVAIL);
+ }
+ conn->input.repmgr_msg.cntrl.size = control_size;
+ conn->input.repmgr_msg.rec.size = rec_size;
+
+ /*
+ * TODO: consider allocating space for ack's just once
+ * (lazily?), or even providing static space for it in
+ * the conn structure itself, thus avoiding a bit of
+ * thrashing in the memory pool. If we do that, then of
+ * course we must get rid of the corresponding call to
+ * free(), below.
+ */
+ dbt = &conn->input.repmgr_msg.cntrl;
+ dbt->size = control_size;
+ if ((ret = __os_malloc(dbenv, control_size,
+ &dbt->data)) != 0)
+ return (ret);
+ __repmgr_add_dbt(&conn->iovecs, dbt);
+
+ dbt = &conn->input.repmgr_msg.rec;
+ if ((dbt->size = rec_size) > 0) {
+ if ((ret = __os_malloc(dbenv, rec_size,
+ &dbt->data)) != 0) {
+ __os_free(dbenv,
+ conn->input.repmgr_msg.cntrl.data);
+ return (ret);
+ }
+ __repmgr_add_dbt(&conn->iovecs, dbt);
+ }
+ }
+
+ conn->reading_phase = DATA_PHASE;
+ break;
+
+ case DATA_PHASE:
+ /*
+ * We have a complete message, so process it. Acks and
+ * handshakes get processed here, in line. Regular rep messages
+ * get posted to a queue, to be handled by a thread from the
+ * message thread pool.
+ */
+ switch (conn->msg_type) {
+ case REPMGR_ACK:
+ /*
+ * Extract the LSN. Save it only if it is an
+ * improvement over what the site has already ack'ed.
+ */
+ ack = conn->input.repmgr_msg.cntrl.data;
+ if (conn->input.repmgr_msg.cntrl.size != sizeof(*ack) ||
+ conn->input.repmgr_msg.rec.size != 0) {
+ __db_errx(dbenv, "bad ack msg size");
+ return (DB_REP_UNAVAIL);
+ }
+ if ((ret = record_ack(dbenv, SITE_FROM_EID(conn->eid),
+ ack)) != 0)
+ return (ret);
+ __os_free(dbenv, conn->input.repmgr_msg.cntrl.data);
+ break;
+
+ case REPMGR_HANDSHAKE:
+ handshake = conn->input.repmgr_msg.cntrl.data;
+ if (conn->input.repmgr_msg.cntrl.size >=
+ sizeof(handshake->version) &&
+ handshake->version != DB_REPMGR_VERSION) {
+ __db_errx(dbenv,
+ "mismatched repmgr message protocol version (%lu)",
+ (u_long)handshake->version);
+ return (DB_REP_UNAVAIL);
+ }
+ if (conn->input.repmgr_msg.cntrl.size !=
+ sizeof(*handshake) ||
+ conn->input.repmgr_msg.rec.size == 0) {
+ __db_errx(dbenv, "bad handshake msg size");
+ return (DB_REP_UNAVAIL);
+ }
+
+ port = handshake->port;
+ host = conn->input.repmgr_msg.rec.data;
+ host[conn->input.repmgr_msg.rec.size-1] = '\0';
+
+ RPRINT(dbenv, (dbenv, &mb,
+ "got handshake %s:%u, pri %lu", host, port,
+ (u_long)ntohl(handshake->priority)));
+
+ if (IS_VALID_EID(conn->eid)) {
+ /*
+ * We must have initiated this as an outgoing
+ * connection, since we already know the EID.
+ * All we need from the handshake is the
+ * priority.
+ */
+ site = SITE_FROM_EID(conn->eid);
+ RPRINT(dbenv, (dbenv, &mb,
+ "handshake from connection to %s:%lu",
+ site->net_addr.host,
+ (u_long)site->net_addr.port));
+ } else {
+ if (IS_VALID_EID(eid =
+ __repmgr_find_site(dbenv, host, port))) {
+ site = SITE_FROM_EID(eid);
+ if (site->state == SITE_IDLE) {
+ RPRINT(dbenv, (dbenv, &mb,
+ "handshake from previously idle site"));
+ retry = site->ref.retry;
+ TAILQ_REMOVE(&db_rep->retries,
+ retry, entries);
+ __os_free(dbenv, retry);
+
+ conn->eid = eid;
+ site->state = SITE_CONNECTED;
+ site->ref.conn = conn;
+ } else {
+ /*
+ * We got an incoming connection
+ * for a site we were already
+ * connected to; discard it.
+ */
+ __db_errx(dbenv,
+ "redundant incoming connection will be ignored");
+ return (DB_REP_UNAVAIL);
+ }
+ } else {
+ RPRINT(dbenv, (dbenv, &mb,
+ "handshake introduces unknown site"));
+ if ((ret = __repmgr_pack_netaddr(
+ dbenv, host, port, NULL,
+ &addr)) != 0)
+ return (ret);
+ if ((ret = __repmgr_new_site(
+ dbenv, &site, &addr,
+ SITE_CONNECTED)) != 0) {
+ __repmgr_cleanup_netaddr(dbenv,
+ &addr);
+ return (ret);
+ }
+ conn->eid = EID_FROM_SITE(site);
+ site->ref.conn = conn;
+ }
+ }
+
+ /* TODO: change priority to be u_int32_t. */
+ DB_ASSERT(dbenv, sizeof(int) == sizeof(u_int32_t));
+ site->priority = (int)ntohl(handshake->priority);
+
+ if ((ret = notify_handshake(dbenv, conn)) != 0)
+ return (ret);
+
+ __os_free(dbenv, conn->input.repmgr_msg.cntrl.data);
+ __os_free(dbenv, conn->input.repmgr_msg.rec.data);
+ break;
+
+ case REPMGR_REP_MESSAGE:
+ if ((ret = __repmgr_queue_put(dbenv,
+ conn->input.rep_message)) != 0)
+ return (ret);
+ /*
+ * The queue has taken over responsibility for the
+ * rep_message buffer, and will free it later.
+ */
+ break;
+
+ default:
+ __db_errx(dbenv, "unknown msg type rcvd: %d",
+ (int)conn->msg_type);
+ return (DB_REP_UNAVAIL);
+ }
+
+ __repmgr_reset_for_reading(conn);
+ break;
+
+ default:
+ DB_ASSERT(dbenv, FALSE);
+ }
+
+ return (0);
+}
+
+/*
+ * Performs any processing needed upon the receipt of a handshake.
+ *
+ * !!!
+ * Caller must hold mutex.
+ */
+static int
+notify_handshake(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+{
+ DB_REP *db_rep;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+#endif
+
+ COMPQUIET(conn, NULL);
+
+ db_rep = dbenv->rep_handle;
+ /*
+ * If we're moping around wishing we knew who the master was, then
+ * getting in touch with another site might finally provide sufficient
+ * connectivity to find out. But just do this once, because otherwise
+ * we get messages while the subsequent rep_start operations are going
+ * on, and rep tosses them in that case. (TODO: this may need further
+ * refinement.)
+ */
+ if (db_rep->master_eid == DB_EID_INVALID && !db_rep->done_one) {
+ db_rep->done_one = TRUE;
+ RPRINT(dbenv, (dbenv, &mb,
+ "handshake with no known master to wake election thread"));
+ return (__repmgr_init_election(dbenv, ELECT_REPSTART));
+ }
+ return (0);
+}
+
+static int
+record_ack(dbenv, site, ack)
+ DB_ENV *dbenv;
+ REPMGR_SITE *site;
+ DB_REPMGR_ACK *ack;
+{
+ DB_REP *db_rep;
+ int ret;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+ SITE_STRING_BUFFER buffer;
+#endif
+
+ db_rep = dbenv->rep_handle;
+
+ /* Ignore stale acks. */
+ if (ack->generation < db_rep->generation) {
+ RPRINT(dbenv, (dbenv, &mb,
+ "ignoring stale ack (%lu<%lu), from %s",
+ (u_long)ack->generation, (u_long)db_rep->generation,
+ __repmgr_format_site_loc(site, buffer)));
+ return (0);
+ }
+ RPRINT(dbenv, (dbenv, &mb,
+ "got ack [%lu][%lu](%lu) from %s", (u_long)ack->lsn.file,
+ (u_long)ack->lsn.offset, (u_long)ack->generation,
+ __repmgr_format_site_loc(site, buffer)));
+
+ /*
+ * TODO: what about future ones? Ideally, you'd like to wake up any
+ * waiting send() threads and have them return DB_REP_OUTDATED or
+ * something. But a mechanism to do that would be messy, and it almost
+ * seems not worth it, since (1) this almost can't happen; and (2) if we
+ * just ignore it, eventually the send() calls will time out (or not),
+ * and as long as we don't mistakenly ack something. The only advantage
+ * to doing something is more timely failure notification to the
+ * application, in (what I think is) an extremely rare situation.
+ */
+ if (ack->generation == db_rep->generation &&
+ log_compare(&ack->lsn, &site->max_ack) == 1) {
+ memcpy(&site->max_ack, &ack->lsn, sizeof(DB_LSN));
+ if ((ret = __repmgr_wake_waiting_senders(dbenv)) != 0)
+ return (ret);
+ }
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_write_some __P((DB_ENV *, REPMGR_CONNECTION *));
+ */
+int
+__repmgr_write_some(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+{
+ REPMGR_FLAT *msg;
+ QUEUED_OUTPUT *output;
+ int bytes, ret;
+
+ while (!STAILQ_EMPTY(&conn->outbound_queue)) {
+ output = STAILQ_FIRST(&conn->outbound_queue);
+ msg = output->msg;
+ if ((bytes = send(conn->fd, &msg->data[output->offset],
+ (size_t)msg->length - output->offset, 0)) == SOCKET_ERROR) {
+ if ((ret = net_errno) == WOULDBLOCK)
+ return (0);
+ else {
+ __db_err(dbenv, ret, "writing data");
+ return (DB_REP_UNAVAIL);
+ }
+ }
+
+ if ((output->offset += (size_t)bytes) >= msg->length) {
+ STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
+ __os_free(dbenv, output);
+ conn->out_queue_length--;
+ if (--msg->ref_count <= 0)
+ __os_free(dbenv, msg);
+ }
+ }
+
+#ifdef DB_WIN32
+ /*
+ * With the queue now empty, it's time to relinquish ownership of this
+ * connection again, so that the next call to send() can write the
+ * message in line, instead of posting it to the queue for us.
+ */
+ if (WSAEventSelect(conn->fd, conn->event_object, FD_READ|FD_CLOSE)
+ == SOCKET_ERROR) {
+ ret = net_errno;
+ __db_err(dbenv, ret, "can't remove FD_WRITE event bit");
+ return (ret);
+ }
+#endif
+
+ return (0);
+}
diff --git a/db/repmgr/repmgr_stat.c b/db/repmgr/repmgr_stat.c
new file mode 100644
index 000000000..cbd7113ee
--- /dev/null
+++ b/db/repmgr/repmgr_stat.c
@@ -0,0 +1,116 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005-2006
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_stat.c,v 1.28 2006/09/08 19:22:42 bostic Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#include "db_int.h"
+
+/*
+ * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **));
+ */
+int
+__repmgr_site_list(dbenv, countp, listp)
+ DB_ENV *dbenv;
+ u_int *countp;
+ DB_REPMGR_SITE **listp;
+{
+ DB_REP *db_rep;
+ DB_REPMGR_SITE *status;
+ REPMGR_SITE *site;
+ size_t array_size, total_size;
+ u_int count, i;
+ int locked, ret;
+ char *name;
+
+ db_rep = dbenv->rep_handle;
+ if (REPMGR_SYNC_INITED(db_rep)) {
+ LOCK_MUTEX(db_rep->mutex);
+ locked = TRUE;
+ } else
+ locked = FALSE;
+
+ /* Initialize for empty list or error return. */
+ ret = 0;
+ *countp = 0;
+ *listp = NULL;
+
+ /* First, add up how much memory we need for the host names. */
+ if ((count = db_rep->site_cnt) == 0)
+ goto err;
+
+ array_size = sizeof(DB_REPMGR_SITE) * count;
+ total_size = array_size;
+ for (i = 0; i < count; i++) {
+ site = &db_rep->sites[i];
+
+ /* Make room for the NUL terminating byte. */
+ total_size += strlen(site->net_addr.host) + 1;
+ }
+
+ if ((ret = __os_umalloc(dbenv, total_size, &status)) != 0)
+ goto err;
+
+ /*
+ * Put the storage for the host names after the array of structs. This
+ * way, the caller can free the whole thing in one single operation.
+ */
+ name = (char *)((u_int8_t *)status + array_size);
+ for (i = 0; i < count; i++) {
+ site = &db_rep->sites[i];
+
+ status[i].eid = EID_FROM_SITE(site);
+
+ status[i].host = name;
+ (void)strcpy(name, site->net_addr.host);
+ name += strlen(name) + 1;
+
+ status[i].port = site->net_addr.port;
+ status[i].status = site->state == SITE_CONNECTED ?
+ DB_REPMGR_CONNECTED : DB_REPMGR_DISCONNECTED;
+ }
+
+ *countp = count;
+ *listp = status;
+
+err: if (locked)
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_print_stats __P((DB_ENV *));
+ */
+int
+__repmgr_print_stats(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REPMGR_SITE *list;
+ u_int count, i;
+ int ret;
+
+ if ((ret = __repmgr_site_list(dbenv, &count, &list)) != 0)
+ return (ret);
+
+ if (count == 0)
+ return (0);
+
+ __db_msg(dbenv, "%s", DB_GLOBAL(db_line));
+ __db_msg(dbenv, "DB_REPMGR site information:");
+
+ for (i = 0; i < count; ++i) {
+ __db_msg(dbenv, "%s (eid: %d, port: %u, %sconnected)",
+ list[i].host, list[i].eid, list[i].port,
+ list[i].status == DB_REPMGR_CONNECTED ? "" : "dis");
+ }
+
+ __os_ufree(dbenv, list);
+
+ return (0);
+}
diff --git a/db/repmgr/repmgr_util.c b/db/repmgr/repmgr_util.c
new file mode 100644
index 000000000..541308384
--- /dev/null
+++ b/db/repmgr/repmgr_util.c
@@ -0,0 +1,415 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005-2006
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_util.c,v 1.27 2006/09/19 14:14:12 mjc Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#include "db_int.h"
+
+/*
+ * Schedules a future attempt to re-establish a connection with the given site.
+ * Usually, we wait the configured retry_wait period. But if the "immediate"
+ * parameter is given as TRUE, we'll make the wait time 0, and put the request
+ * at the _beginning_ of the retry queue. Note how this allows us to preserve
+ * the property that the queue stays in time order simply by appending to the
+ * end.
+ *
+ * PUBLIC: int __repmgr_schedule_connection_attempt __P((DB_ENV *, u_int, int));
+ *
+ * !!!
+ * Caller should hold mutex.
+ *
+ * Unless an error occurs, we always attempt to wake the main thread;
+ * __repmgr_bust_connection relies on this behavior.
+ */
+int
+__repmgr_schedule_connection_attempt(dbenv, eid, immediate)
+ DB_ENV *dbenv;
+ u_int eid;
+ int immediate;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+ REPMGR_RETRY *retry;
+ repmgr_timeval_t t;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+ if ((ret = __os_malloc(dbenv, sizeof(*retry), &retry)) != 0)
+ return (ret);
+
+ __os_clock(dbenv, &t.tv_sec, &t.tv_usec);
+ if (immediate)
+ TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries);
+ else {
+ if ((t.tv_usec += db_rep->connection_retry_wait%1000000) >
+ 1000000) {
+ t.tv_sec++;
+ t.tv_usec -= 1000000;
+ }
+ t.tv_sec += db_rep->connection_retry_wait/1000000;
+ TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries);
+ }
+ retry->eid = eid;
+ memcpy(&retry->time, &t, sizeof(repmgr_timeval_t));
+
+ site = SITE_FROM_EID(eid);
+ site->state = SITE_IDLE;
+ site->ref.retry = retry;
+
+ return (__repmgr_wake_main_thread(dbenv));
+}
+
+/*
+ * Initialize the necessary control structures to begin reading a new input
+ * message.
+ *
+ * PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *));
+ */
+/*
+ * TODO: we need to make sure we call this: (1) initially, when we first create
+ * a connection; (2) after processing a message, to get ready to read the next
+ * one; and (3) after a connection gets successfully re-established after a
+ * failure. (Actually, #1 and #3 should probably end up being the same thing,
+ * if the code is organized properly.)
+ */
+void
+__repmgr_reset_for_reading(con)
+ REPMGR_CONNECTION *con;
+{
+ con->reading_phase = SIZES_PHASE;
+ __repmgr_iovec_init(&con->iovecs);
+ __repmgr_add_buffer(&con->iovecs, &con->msg_type,
+ sizeof(con->msg_type));
+ __repmgr_add_buffer(&con->iovecs, &con->control_size_buf,
+ sizeof(con->control_size_buf));
+ __repmgr_add_buffer(&con->iovecs, &con->rec_size_buf,
+ sizeof(con->rec_size_buf));
+}
+
+/*
+ * Constructs a DB_REPMGR_CONNECTION structure, and puts it on the main list of
+ * connections. It does not initialize eid, since that isn't needed and/or
+ * immediately known in all cases.
+ *
+ * PUBLIC: int __repmgr_new_connection __P((DB_ENV *, REPMGR_CONNECTION **,
+ * PUBLIC: socket_t, u_int32_t));
+ */
+int
+__repmgr_new_connection(dbenv, connp, s, flags)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION **connp;
+ socket_t s;
+ u_int32_t flags;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *c;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+ if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
+ return (ret);
+
+ c->fd = s;
+ c->flags = flags;
+
+ STAILQ_INIT(&c->outbound_queue);
+ c->out_queue_length = 0;
+
+ __repmgr_reset_for_reading(c);
+ TAILQ_INSERT_TAIL(&db_rep->connections, c, entries);
+ *connp = c;
+
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_new_site __P((DB_ENV *, REPMGR_SITE**,
+ * PUBLIC: const repmgr_netaddr_t *, int));
+ *
+ * !!!
+ * Caller must hold mutex.
+ */
+int
+__repmgr_new_site(dbenv, sitep, addr, state)
+ DB_ENV *dbenv;
+ REPMGR_SITE **sitep;
+ const repmgr_netaddr_t *addr;
+ int state;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *site;
+ u_int new_site_max, eid;
+ int ret;
+#ifdef DIAGNOSTIC
+ DB_MSGBUF mb;
+ SITE_STRING_BUFFER buffer;
+#endif
+
+ db_rep = dbenv->rep_handle;
+ if (db_rep->site_cnt >= db_rep->site_max) {
+#define INITIAL_SITES_ALLOCATION 10 /* Arbitrary guess. */
+ new_site_max = db_rep->site_max == 0 ?
+ INITIAL_SITES_ALLOCATION : db_rep->site_max * 2;
+ if ((ret = __os_realloc(dbenv,
+ sizeof(REPMGR_SITE) * new_site_max, &db_rep->sites)) != 0)
+ return (ret);
+ db_rep->site_max = new_site_max;
+ }
+ eid = db_rep->site_cnt++;
+
+ site = &db_rep->sites[eid];
+
+ memcpy(&site->net_addr, addr, sizeof(*addr));
+ ZERO_LSN(site->max_ack);
+ site->priority = -1; /* OOB value indicates we don't yet know. */
+ site->state = state;
+
+ RPRINT(dbenv, (dbenv, &mb, "EID %u is assigned for %s", eid,
+ __repmgr_format_site_loc(site, buffer)));
+ *sitep = site;
+ return (0);
+}
+
+/*
+ * Destructor for a repmgr_netaddr_t, cleans up any allocated memory pointed to
+ * by the addr.
+ *
+ * PUBLIC: void __repmgr_cleanup_netaddr __P((DB_ENV *, repmgr_netaddr_t *));
+ */
+void
+__repmgr_cleanup_netaddr(dbenv, addr)
+ DB_ENV *dbenv;
+ repmgr_netaddr_t *addr;
+{
+ if (addr->address_list != NULL) {
+ __db_freeaddrinfo(dbenv, addr->address_list);
+ addr->address_list = addr->current = NULL;
+ }
+ if (addr->host != NULL) {
+ __os_free(dbenv, addr->host);
+ addr->host = NULL;
+ }
+}
+
+/*
+ * PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *));
+ */
+void
+__repmgr_iovec_init(v)
+ REPMGR_IOVECS *v;
+{
+ v->offset = v->count = 0;
+ v->total_bytes = 0;
+}
+
+/*
+ * PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t));
+ *
+ * !!!
+ * There is no checking for overflow of the vectors[5] array.
+ */
+void
+__repmgr_add_buffer(v, address, length)
+ REPMGR_IOVECS *v;
+ void *address;
+ size_t length;
+{
+ v->vectors[v->count].iov_base = address;
+ v->vectors[v->count++].iov_len = length;
+ v->total_bytes += length;
+}
+
+/*
+ * PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *));
+ */
+void
+__repmgr_add_dbt(v, dbt)
+ REPMGR_IOVECS *v;
+ const DBT *dbt;
+{
+ v->vectors[v->count].iov_base = dbt->data;
+ v->vectors[v->count++].iov_len = dbt->size;
+ v->total_bytes += dbt->size;
+}
+
+/*
+ * Update a set of iovecs to reflect the number of bytes transferred in an I/O
+ * operation, so that the iovecs can be used to continue transferring where we
+ * left off.
+ * Returns TRUE if the set of buffers is now fully consumed, FALSE if more
+ * remains.
+ *
+ * PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t));
+ */
+int
+__repmgr_update_consumed(v, byte_count)
+ REPMGR_IOVECS *v;
+ size_t byte_count;
+{
+ db_iovec_t *iov;
+ int i;
+
+ for (i = v->offset; ; i++) {
+ DB_ASSERT(NULL, i < v->count && byte_count > 0);
+ iov = &v->vectors[i];
+ if (byte_count > iov->iov_len) {
+ /*
+ * We've consumed (more than) this vector's worth.
+ * Adjust count and continue.
+ */
+ byte_count -= iov->iov_len;
+ } else {
+ /* Adjust length of remaining portion of vector. */
+ iov->iov_len -= byte_count;
+ if (iov->iov_len > 0) {
+ /*
+ * Still some left in this vector. Adjust base
+ * address too, and leave offset pointing here.
+ */
+ iov->iov_base = (void *)
+ ((u_int8_t *)iov->iov_base + byte_count);
+ v->offset = i;
+ } else {
+ /*
+ * Consumed exactly to a vector boundary.
+ * Advance to next vector for next time.
+ */
+ v->offset = i+1;
+ }
+ /*
+ * If offset has reached count, the entire thing is
+ * consumed.
+ */
+ return (v->offset >= v->count);
+ }
+ }
+}
+
+/*
+ * Builds a buffer containing our network address information, suitable for
+ * publishing as cdata via a call to rep_start, and sets up the given DBT to
+ * point to it. The buffer is dynamically allocated memory, and the caller must
+ * assume responsibility for it.
+ *
+ * PUBLIC: int __repmgr_prepare_my_addr __P((DB_ENV *, DBT *));
+ */
+int
+__repmgr_prepare_my_addr(dbenv, dbt)
+ DB_ENV *dbenv;
+ DBT *dbt;
+{
+ DB_REP *db_rep;
+ size_t size, hlen;
+ u_int16_t port_buffer;
+ u_int8_t *ptr;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+
+ /*
+ * The cdata message consists of the 2-byte port number, in network byte
+ * order, followed by the null-terminated host name string.
+ */
+ port_buffer = htons(db_rep->my_addr.port);
+ size = sizeof(port_buffer) +
+ (hlen = strlen(db_rep->my_addr.host) + 1);
+ if ((ret = __os_malloc(dbenv, size, &ptr)) != 0)
+ return (ret);
+
+ DB_INIT_DBT(*dbt, ptr, size);
+
+ memcpy(ptr, &port_buffer, sizeof(port_buffer));
+ ptr = &ptr[sizeof(port_buffer)];
+ memcpy(ptr, db_rep->my_addr.host, hlen);
+
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_timeval_cmp
+ * PUBLIC: __P((repmgr_timeval_t *, repmgr_timeval_t *));
+ */
+int
+__repmgr_timeval_cmp(a, b)
+ repmgr_timeval_t *a, *b;
+{
+ if (a->tv_sec == b->tv_sec) {
+ if (a->tv_usec == b->tv_usec)
+ return (0);
+ else
+ return (a->tv_usec < b->tv_usec ? -1 : 1);
+ } else
+ return (a->tv_sec < b->tv_sec ? -1 : 1);
+}
+
+/*
+ * Provide the appropriate value for nsites, the number of sites in the
+ * replication group. If the application has specified a value, use that.
+ * Otherwise, just use the number of sites we know of.
+ *
+ * PUBLIC: u_int __repmgr_get_nsites __P((DB_REP *));
+ */
+u_int
+__repmgr_get_nsites(db_rep)
+ DB_REP *db_rep;
+{
+ if (db_rep->config_nsites > 0)
+ return ((u_int)db_rep->config_nsites);
+
+ /*
+ * The number of other sites in our table, plus 1 to count ourself.
+ */
+ return (db_rep->site_cnt + 1);
+}
+
+/*
+ * PUBLIC: void __repmgr_thread_failure __P((DB_ENV *, int));
+ */
+void
+__repmgr_thread_failure(dbenv, why)
+ DB_ENV *dbenv;
+ int why;
+{
+ (void)__repmgr_stop_threads(dbenv);
+ (void)__db_panic(dbenv, why);
+}
+
+/*
+ * Format a printable representation of a site location, suitable for inclusion
+ * in an error message. The buffer must be at least as big as
+ * MAX_SITE_LOC_STRING.
+ *
+ * PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *, int, char *));
+ */
+char *
+__repmgr_format_eid_loc(db_rep, eid, buffer)
+ DB_REP *db_rep;
+ int eid;
+ char *buffer;
+{
+ if (IS_VALID_EID(eid))
+ return (__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer));
+
+ snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)");
+ return (buffer);
+}
+
+/*
+ * PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *));
+ */
+char *
+__repmgr_format_site_loc(site, buffer)
+ REPMGR_SITE *site;
+ char *buffer;
+{
+ snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu",
+ site->net_addr.host, (u_long)site->net_addr.port);
+ return (buffer);
+}
diff --git a/db/repmgr/repmgr_windows.c b/db/repmgr/repmgr_windows.c
new file mode 100644
index 000000000..20c290665
--- /dev/null
+++ b/db/repmgr/repmgr_windows.c
@@ -0,0 +1,708 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005-2006
+ * Oracle Corporation. All rights reserved.
+ *
+ * $Id: repmgr_windows.c,v 1.14 2006/09/11 15:15:20 bostic Exp $
+ */
+
+#include "db_config.h"
+
+#define __INCLUDE_NETWORKING 1
+#include "db_int.h"
+
+typedef struct __ack_waiter {
+ HANDLE event;
+ const DB_LSN *lsnp;
+ struct __ack_waiter *next_free;
+} ACK_WAITER;
+
+#define WAITER_SLOT_IN_USE(w) ((w)->lsnp != NULL)
+
+/*
+ * Array slots [0:next_avail-1] are initialized, and either in use or on the
+ * free list. Slots beyond that are virgin territory, whose memory contents
+ * could be garbage. In particular, note that slots [0:next_avail-1] have a
+ * Win32 Event Object created for them, which have to be freed when cleaning up
+ * this data structure.
+ *
+ * "first_free" points to a list of not-in-use slots threaded through the first
+ * section of the array.
+ */
+struct __ack_waiters_table {
+ struct __ack_waiter *array;
+ int size;
+ int next_avail;
+ struct __ack_waiter *first_free;
+};
+
+static int allocate_wait_slot __P((DB_ENV *, ACK_WAITER **));
+static void free_wait_slot __P((DB_ENV *, ACK_WAITER *));
+static int handle_completion __P((DB_ENV *, REPMGR_CONNECTION *));
+static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *,
+ LPWSANETWORKEVENTS));
+
+int
+__repmgr_thread_start(dbenv, runnable)
+ DB_ENV *dbenv;
+ REPMGR_RUNNABLE *runnable;
+{
+ HANDLE thread_id;
+
+ runnable->finished = FALSE;
+
+ thread_id = CreateThread(NULL, 0,
+ (LPTHREAD_START_ROUTINE)runnable->run, dbenv, 0, NULL);
+ if (thread_id == NULL)
+ return (GetLastError());
+ runnable->thread_id = thread_id;
+ return (0);
+}
+
+int
+__repmgr_thread_join(thread)
+ REPMGR_RUNNABLE *thread;
+{
+ if (WaitForSingleObject(thread->thread_id, INFINITE) == WAIT_OBJECT_0)
+ return (0);
+ return (GetLastError());
+}
+
+int __repmgr_set_nonblocking(s)
+ SOCKET s;
+{
+ int ret;
+ u_long arg;
+
+ arg = 1; /* any non-zero value */
+ if ((ret = ioctlsocket(s, FIONBIO, &arg)) == SOCKET_ERROR)
+ return (WSAGetLastError());
+ return (0);
+}
+
+/*
+ * Wake any send()-ing threads waiting for an acknowledgement.
+ *
+ * !!!
+ * Caller must hold the repmgr->mutex, if this thread synchronization is to work
+ * properly.
+ */
+int
+__repmgr_wake_waiting_senders(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ ACK_WAITER *slot;
+ int i, ret;
+
+ ret = 0;
+ db_rep = dbenv->rep_handle;
+ for (i=0; i<db_rep->waiters->next_avail; i++) {
+ slot = &db_rep->waiters->array[i];
+ if (!WAITER_SLOT_IN_USE(slot))
+ continue;
+ if (__repmgr_is_permanent(dbenv, slot->lsnp))
+ if (!SetEvent(slot->event) && ret == 0)
+ ret = GetLastError();
+ }
+ return (ret);
+}
+
+/*
+ * !!!
+ * Caller must hold mutex.
+ */
+int
+__repmgr_await_ack(dbenv, lsnp)
+ DB_ENV *dbenv;
+ const DB_LSN *lsnp;
+{
+ DB_REP *db_rep;
+ ACK_WAITER *me;
+ DWORD ret;
+ DWORD timeout;
+
+ db_rep = dbenv->rep_handle;
+
+ if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
+ goto err;
+
+ /* convert time-out from microseconds to milliseconds, rounding up */
+ timeout = db_rep->ack_timeout > 0 ? ((db_rep->ack_timeout+999) / 1000) :
+ INFINITE;
+ me->lsnp = lsnp;
+ if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
+ FALSE)) == WAIT_FAILED) {
+ ret = GetLastError();
+ } else if (ret == WAIT_TIMEOUT)
+ ret = DB_REP_UNAVAIL;
+ else
+ DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
+
+ LOCK_MUTEX(db_rep->mutex);
+ free_wait_slot(dbenv, me);
+
+err:
+ return (ret);
+}
+
+/*
+ * !!!
+ * Caller must hold the mutex.
+ */
+static int
+allocate_wait_slot(dbenv, resultp)
+ DB_ENV *dbenv;
+ ACK_WAITER **resultp;
+{
+ DB_REP *db_rep;
+ ACK_WAITERS_TABLE *table;
+ ACK_WAITER *w;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+ table = db_rep->waiters;
+ if (table->first_free == NULL) {
+ if (table->next_avail >= table->size) {
+ /*
+ * Grow the array.
+ */
+ table->size *= 2;
+ w = table->array;
+ if ((ret = __os_realloc(dbenv, table->size * sizeof(*w),
+ &w)) != 0)
+ return (ret);
+ table->array = w;
+ }
+ /*
+ * Here if, one way or another, we're good to go for using the
+ * next slot (for the first time).
+ */
+ w = &table->array[table->next_avail++];
+ if ((w->event = CreateEvent(NULL, FALSE, FALSE, NULL)) ==
+ NULL) {
+ /*
+ * Maintain the sanctity of our rule that
+ * [0:next_avail-1] contain valid Event Objects.
+ */
+ --table->next_avail;
+ return (GetLastError());
+ }
+ } else {
+ w = table->first_free;
+ table->first_free = w->next_free;
+ }
+ *resultp = w;
+ return (0);
+}
+
+static void
+free_wait_slot(dbenv, slot)
+ DB_ENV *dbenv;
+ ACK_WAITER *slot;
+{
+ DB_REP *db_rep;
+
+ db_rep = dbenv->rep_handle;
+
+ slot->lsnp = NULL; /* show it's not in use */
+ slot->next_free = db_rep->waiters->first_free;
+ db_rep->waiters->first_free = slot;
+}
+
+/*
+ * Make resource allocation an all-or-nothing affair, outside of this and the
+ * close_sync function. db_rep->waiters should be non-NULL iff all of these
+ * resources have been created.
+ */
+int
+__repmgr_init_sync(dbenv, db_rep)
+ DB_ENV *dbenv;
+ DB_REP *db_rep;
+{
+#define INITIAL_ALLOCATION 5 /* arbitrary size */
+ ACK_WAITERS_TABLE *table;
+ int ret;
+
+ db_rep->signaler = db_rep->queue_nonempty = db_rep->check_election =
+ db_rep->mutex = NULL;
+ table = NULL;
+
+ if ((db_rep->signaler = CreateEvent(NULL, /* security attr */
+ FALSE, /* (not) of the manual reset variety */
+ FALSE, /* (not) initially signaled */
+ NULL)) == NULL) /* name */
+ goto geterr;
+
+ if ((db_rep->queue_nonempty = CreateEvent(NULL, TRUE, FALSE, NULL))
+ == NULL)
+ goto geterr;
+
+ if ((db_rep->check_election = CreateEvent(NULL, FALSE, FALSE, NULL))
+ == NULL)
+ goto geterr;
+
+ if ((db_rep->mutex = CreateMutex(NULL, FALSE, NULL)) == NULL)
+ goto geterr;
+
+ if ((ret = __os_calloc(dbenv, 1, sizeof(ACK_WAITERS_TABLE), &table))
+ != 0)
+ goto err;
+
+ if ((ret = __os_calloc(dbenv, INITIAL_ALLOCATION, sizeof(ACK_WAITER),
+ &table->array)) != 0)
+ goto err;
+
+ table->size = INITIAL_ALLOCATION;
+ table->first_free = NULL;
+ table->next_avail = 0;
+
+ /* There's a restaurant joke in there somewhere. */
+ db_rep->waiters = table;
+ return (0);
+
+geterr:
+ ret = GetLastError();
+err:
+ if (db_rep->check_election != NULL)
+ CloseHandle(db_rep->check_election);
+ if (db_rep->queue_nonempty != NULL)
+ CloseHandle(db_rep->queue_nonempty);
+ if (db_rep->signaler != NULL)
+ CloseHandle(db_rep->signaler);
+ if (db_rep->mutex != NULL)
+ CloseHandle(db_rep->mutex);
+ if (table != NULL)
+ __os_free(dbenv, table);
+ db_rep->waiters = NULL;
+ return (ret);
+}
+
+int
+__repmgr_close_sync(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ int i, ret;
+
+ db_rep = dbenv->rep_handle;
+ if (!(REPMGR_SYNC_INITED(db_rep)))
+ return (0);
+
+ ret = 0;
+ for (i = 0; i < db_rep->waiters->next_avail; i++) {
+ if (!CloseHandle(db_rep->waiters->array[i].event) && ret == 0)
+ ret = GetLastError();
+ }
+ __os_free(dbenv, db_rep->waiters->array);
+ __os_free(dbenv, db_rep->waiters);
+
+ if (!CloseHandle(db_rep->check_election) && ret == 0)
+ ret = GetLastError();
+
+ if (!CloseHandle(db_rep->queue_nonempty) && ret == 0)
+ ret = GetLastError();
+
+ if (!CloseHandle(db_rep->signaler) && ret == 0)
+ ret = GetLastError();
+
+ if (!CloseHandle(db_rep->mutex) && ret == 0)
+ ret = GetLastError();
+
+ db_rep->waiters = NULL;
+ return (ret);
+}
+
+/*
+ * Performs net-related resource initialization other than memory initialization
+ * and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing"
+ * sentinel signifying that these resources are allocated (except that now the
+ * new wsa_inited flag may be used to indicate that WSAStartup has already been
+ * called).
+ */
+int
+__repmgr_net_init(dbenv, db_rep)
+ DB_ENV *dbenv;
+ DB_REP *db_rep;
+{
+ int ret;
+
+ /* Initialize the Windows sockets DLL. */
+ if (!db_rep->wsa_inited && (ret = __repmgr_wsa_init(dbenv)) != 0)
+ goto err;
+
+ if ((ret = __repmgr_listen(dbenv)) == 0)
+ return (0);
+
+ if (WSACleanup() == SOCKET_ERROR) {
+ ret = net_errno;
+ __db_err(dbenv, ret, "WSACleanup");
+ }
+
+err: db_rep->listen_fd = INVALID_SOCKET;
+ return (ret);
+}
+
+/*
+ * __repmgr_wsa_init --
+ * Initialize the Windows sockets DLL.
+ *
+ * PUBLIC: int __repmgr_wsa_init __P((DB_ENV *));
+ */
+int
+__repmgr_wsa_init(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ WSADATA wsaData;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+
+ if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) {
+ __db_err(dbenv, ret, "unable to initialize Windows networking");
+ return (ret);
+ }
+ db_rep->wsa_inited = TRUE;
+
+ return (0);
+}
+
+int
+__repmgr_lock_mutex(mutex)
+ mgr_mutex_t *mutex;
+{
+ if (WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0)
+ return (0);
+ return (GetLastError());
+}
+
+int
+__repmgr_unlock_mutex(mutex)
+ mgr_mutex_t *mutex;
+{
+ if (ReleaseMutex(*mutex))
+ return (0);
+ return (GetLastError());
+}
+
+int
+__repmgr_signal(v)
+ cond_var_t *v;
+{
+ return (SetEvent(*v) ? 0 : GetLastError());
+}
+
+int
+__repmgr_wake_main_thread(dbenv)
+ DB_ENV *dbenv;
+{
+ if (!SetEvent(dbenv->rep_handle->signaler))
+ return (GetLastError());
+ return (0);
+}
+
+int
+__repmgr_writev(fd, iovec, buf_count, byte_count_p)
+ socket_t fd;
+ db_iovec_t *iovec;
+ int buf_count;
+ size_t *byte_count_p;
+{
+ DWORD bytes;
+
+ if (WSASend(fd, iovec,
+ (DWORD)buf_count, &bytes, 0, NULL, NULL) == SOCKET_ERROR)
+ return (net_errno);
+
+ *byte_count_p = (size_t)bytes;
+ return (0);
+}
+
+int
+__repmgr_readv(fd, iovec, buf_count, xfr_count_p)
+ socket_t fd;
+ db_iovec_t *iovec;
+ int buf_count;
+ size_t *xfr_count_p;
+{
+ DWORD bytes, flags;
+
+ flags = 0;
+ if (WSARecv(fd, iovec,
+ (DWORD)buf_count, &bytes, &flags, NULL, NULL) == SOCKET_ERROR)
+ return (net_errno);
+
+ *xfr_count_p = (size_t)bytes;
+ return (0);
+}
+
+/*
+ * Calculate the time duration from now til "when", in the form of an integer
+ * (suitable for WSAWaitForMultipleEvents()), clipping the result at 0 (i.e.,
+ * avoid a negative result).
+ */
+void
+__repmgr_timeval_diff_current(dbenv, when, result)
+ DB_ENV *dbenv;
+ repmgr_timeval_t *when;
+ select_timeout_t *result;
+{
+ repmgr_timeval_t now;
+
+ __os_clock(dbenv, &now.tv_sec, &now.tv_usec);
+ if (__repmgr_timeval_cmp(when, &now) <= 0)
+ *result = 0;
+ else
+ *result = (when->tv_sec - now.tv_sec) * MS_PER_SEC +
+ (when->tv_usec - now.tv_usec) / USEC_PER_MS;
+}
+
+int
+__repmgr_select_loop(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn, *next;
+ REPMGR_RETRY *retry;
+ select_timeout_t timeout;
+ WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
+ REPMGR_CONNECTION *connections[WSA_MAXIMUM_WAIT_EVENTS];
+ DWORD nevents, ret;
+ int flow_control, i;
+ WSAEVENT listen_event;
+ WSANETWORKEVENTS net_events;
+
+ db_rep = dbenv->rep_handle;
+
+ if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) {
+ __db_err(
+ dbenv, net_errno, "can't create event for listen socket");
+ return (net_errno);
+ }
+ if (WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) ==
+ SOCKET_ERROR) {
+ ret = net_errno;
+ __db_err(dbenv, ret, "can't enable event for listener");
+ goto out;
+ }
+
+ LOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_first_try_connections(dbenv)) != 0)
+ goto unlock;
+ flow_control = FALSE;
+ for (;;) {
+ /* Start with the two events that we always wait for. */
+ events[0] = db_rep->signaler;
+ events[1] = listen_event;
+ nevents = 2;
+
+ /*
+ * Add an event for each surviving socket that we're interested
+ * in. (For now [until we implement flow control], that's all
+ * of them, in one form or another.)
+ * Note that even if we're suffering flow control, we
+ * nevertheless still read if we haven't even yet gotten a
+ * handshake. Why? (1) Handshakes are important; and (2) they
+ * don't hurt anything flow-control-wise.
+ * Loop just like TAILQ_FOREACH, except that we need to be
+ * able to unlink a list entry.
+ */
+ for (conn = TAILQ_FIRST(&db_rep->connections);
+ conn != NULL;
+ conn = next) {
+ next = TAILQ_NEXT(conn, entries);
+ if (F_ISSET(conn, CONN_DEFUNCT)) {
+ __repmgr_cleanup_connection(dbenv, conn);
+ continue;
+ }
+ if (F_ISSET(conn, CONN_CONNECTING) ||
+ !STAILQ_EMPTY(&conn->outbound_queue) ||
+ (!flow_control || !IS_VALID_EID(conn->eid))) {
+ events[nevents] = conn->event_object;
+ connections[nevents++] = conn;
+ }
+ }
+
+ /*
+ * Decide how long to wait based on when it will next be time to
+ * retry an idle connection. (List items are in order, so we
+ * only have to examine the first one.)
+ */
+ if (TAILQ_EMPTY(&db_rep->retries))
+ timeout = WSA_INFINITE;
+ else {
+ retry = TAILQ_FIRST(&db_rep->retries);
+
+ __repmgr_timeval_diff_current(
+ dbenv, &retry->time, &timeout);
+ }
+
+ UNLOCK_MUTEX(db_rep->mutex);
+ ret = WSAWaitForMultipleEvents(nevents, events, FALSE, timeout,
+ FALSE);
+ if (db_rep->finished) {
+ ret = 0;
+ goto out;
+ }
+ LOCK_MUTEX(db_rep->mutex);
+
+ if (ret >= WSA_WAIT_EVENT_0 &&
+ ret < WSA_WAIT_EVENT_0 + nevents) {
+ switch (i = ret - WSA_WAIT_EVENT_0) {
+ case 0:
+ /* Another thread woke us. */
+ break;
+ case 1:
+ if ((ret = WSAEnumNetworkEvents(
+ db_rep->listen_fd, listen_event,
+ &net_events)) == SOCKET_ERROR) {
+ ret = net_errno;
+ goto unlock;
+ }
+ DB_ASSERT(dbenv,
+ net_events.lNetworkEvents & FD_ACCEPT);
+ if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT])
+ != 0)
+ goto unlock;
+ if ((ret = __repmgr_accept(dbenv)) != 0)
+ goto unlock;
+ break;
+ default:
+ if ((ret = handle_completion(dbenv,
+ connections[i])) != 0)
+ goto unlock;
+ break;
+ }
+ } else if (ret == WSA_WAIT_TIMEOUT) {
+ if ((ret = __repmgr_retry_connections(dbenv)) != 0)
+ goto unlock;
+ } else if (ret == WSA_WAIT_FAILED) {
+ ret = net_errno;
+ goto unlock;
+ }
+ }
+
+unlock:
+ UNLOCK_MUTEX(db_rep->mutex);
+out:
+ if (!CloseHandle(listen_event) && ret == 0)
+ ret = GetLastError();
+ return (ret);
+}
+
+static int
+handle_completion(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+{
+ int ret;
+ WSANETWORKEVENTS events;
+
+ if ((ret = WSAEnumNetworkEvents(conn->fd, conn->event_object, &events))
+ == SOCKET_ERROR) {
+ __db_err(dbenv, net_errno, "EnumNetworkEvents");
+ ret = DB_REP_UNAVAIL;
+ goto err;
+ }
+
+ if (F_ISSET(conn, CONN_CONNECTING)) {
+ if ((ret = finish_connecting(dbenv, conn, &events)) != 0)
+ goto err;
+ } else { /* Check both writing and reading. */
+ if (events.lNetworkEvents & FD_CLOSE) {
+ __db_err(dbenv,
+ events.iErrorCode[FD_CLOSE_BIT],
+ "connection closed");
+ ret = DB_REP_UNAVAIL;
+ goto err;
+ }
+
+ if (events.lNetworkEvents & FD_WRITE) {
+ if (events.iErrorCode[FD_WRITE_BIT] != 0) {
+ __db_err(dbenv,
+ events.iErrorCode[FD_WRITE_BIT],
+ "error writing");
+ ret = DB_REP_UNAVAIL;
+ goto err;
+ } else if ((ret =
+ __repmgr_write_some(dbenv, conn)) != 0)
+ goto err;
+ }
+
+ if (events.lNetworkEvents & FD_READ) {
+ if (events.iErrorCode[FD_READ_BIT] != 0) {
+ __db_err(dbenv,
+ events.iErrorCode[FD_READ_BIT],
+ "error reading");
+ ret = DB_REP_UNAVAIL;
+ goto err;
+ } else if ((ret =
+ __repmgr_read_from_site(dbenv, conn)) != 0)
+ goto err;
+ }
+ }
+
+ return (0);
+
+err: if (ret == DB_REP_UNAVAIL)
+ return (__repmgr_bust_connection(dbenv, conn, TRUE));
+ return (ret);
+}
+
+static int
+finish_connecting(dbenv, conn, events)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ LPWSANETWORKEVENTS events;
+{
+ DB_REP *db_rep;
+ u_int eid;
+/* char reason[100]; */
+ int ret/*, t_ret*/;
+/* DWORD_PTR values[1]; */
+
+ if (!(events->lNetworkEvents & FD_CONNECT)) {
+ /* TODO: Is this even possible? */
+ return (0);
+ }
+
+ F_CLR(conn, CONN_CONNECTING);
+
+ if ((ret = events->iErrorCode[FD_CONNECT_BIT]) != 0) {
+/* t_ret = FormatMessage( */
+/* FORMAT_MESSAGE_IGNORE_INSERTS | */
+/* FORMAT_MESSAGE_FROM_SYSTEM | */
+/* FORMAT_MESSAGE_ARGUMENT_ARRAY, */
+/* NULL, ret, 0, (LPTSTR)reason, sizeof(reason), values); */
+/* __db_err(dbenv/\*, ret*\/, "connecting: %s", */
+/* reason); */
+/* LocalFree(reason); */
+ __db_err(dbenv, ret, "connecting");
+ goto err;
+ }
+
+ if (WSAEventSelect(conn->fd, conn->event_object, FD_READ | FD_CLOSE) ==
+ SOCKET_ERROR) {
+ ret = net_errno;
+ __db_err(dbenv, ret, "setting event bits for reading");
+ return (ret);
+ }
+
+ return (__repmgr_send_handshake(dbenv, conn));
+
+err:
+ db_rep = dbenv->rep_handle;
+ eid = conn->eid;
+ DB_ASSERT(dbenv, IS_VALID_EID(eid));
+
+ if (ADDR_LIST_NEXT(&SITE_FROM_EID(eid)->net_addr) == NULL)
+ return (DB_REP_UNAVAIL);
+
+ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+ __repmgr_cleanup_connection(dbenv, conn);
+ ret = __repmgr_connect_site(dbenv, eid);
+ DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+ return (ret);
+}