summaryrefslogtreecommitdiff
path: root/db/repmgr
diff options
context:
space:
mode:
authorPanu Matilainen <pmatilai@redhat.com>2008-03-07 14:05:28 +0200
committerPanu Matilainen <pmatilai@redhat.com>2008-03-07 14:05:28 +0200
commit2414d522bcfbd0ba9e6115eb7e2a3ba056ac2795 (patch)
treea995b380eb055a041232681c626cef5af30dccfc /db/repmgr
parent501197e5ef5ce8687aaf8bd4352f296bb7a5c0e8 (diff)
downloadlibrpm-tizen-2414d522bcfbd0ba9e6115eb7e2a3ba056ac2795.tar.gz
librpm-tizen-2414d522bcfbd0ba9e6115eb7e2a3ba056ac2795.tar.bz2
librpm-tizen-2414d522bcfbd0ba9e6115eb7e2a3ba056ac2795.zip
Remove BDB copy from the repository, it doesn't belong there
Diffstat (limited to 'db/repmgr')
-rw-r--r--db/repmgr/repmgr_elect.c384
-rw-r--r--db/repmgr/repmgr_method.c421
-rw-r--r--db/repmgr/repmgr_msg.c341
-rw-r--r--db/repmgr/repmgr_net.c1075
-rw-r--r--db/repmgr/repmgr_posix.c691
-rw-r--r--db/repmgr/repmgr_queue.c155
-rw-r--r--db/repmgr/repmgr_sel.c875
-rw-r--r--db/repmgr/repmgr_stat.c298
-rw-r--r--db/repmgr/repmgr_stub.c198
-rw-r--r--db/repmgr/repmgr_util.c426
-rw-r--r--db/repmgr/repmgr_windows.c715
11 files changed, 0 insertions, 5579 deletions
diff --git a/db/repmgr/repmgr_elect.c b/db/repmgr/repmgr_elect.c
deleted file mode 100644
index bf2b41dac..000000000
--- a/db/repmgr/repmgr_elect.c
+++ /dev/null
@@ -1,384 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2005,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_elect.c,v 1.31 2007/05/17 15:15:50 bostic Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#include "db_int.h"
-
-static int __repmgr_is_ready __P((DB_ENV *));
-static int __repmgr_elect_main __P((DB_ENV *));
-static void *__repmgr_elect_thread __P((void *));
-static int start_election_thread __P((DB_ENV *));
-
-/*
- * Starts the election thread, or wakes up an existing one, starting off with
- * the specified operation (an election, or a call to rep_start(CLIENT), or
- * nothing). Avoid multiple concurrent elections.
- *
- * PUBLIC: int __repmgr_init_election __P((DB_ENV *, int));
- *
- * !!!
- * Caller must hold mutex.
- */
-int
-__repmgr_init_election(dbenv, initial_operation)
- DB_ENV *dbenv;
- int initial_operation;
-{
- DB_REP *db_rep;
- int ret;
-
- db_rep = dbenv->rep_handle;
- if (db_rep->finished) {
- RPRINT(dbenv, (dbenv,
- "ignoring elect thread request %d; repmgr is finished",
- initial_operation));
- return (0);
- }
-
- db_rep->operation_needed = initial_operation;
- if (db_rep->elect_thread == NULL)
- ret = start_election_thread(dbenv);
- else if (db_rep->elect_thread->finished) {
- RPRINT(dbenv, (dbenv, "join dead elect thread"));
- if ((ret = __repmgr_thread_join(db_rep->elect_thread)) != 0)
- return (ret);
- __os_free(dbenv, db_rep->elect_thread);
- db_rep->elect_thread = NULL;
- ret = start_election_thread(dbenv);
- } else {
- RPRINT(dbenv, (dbenv, "reusing existing elect thread"));
- if ((ret = __repmgr_signal(&db_rep->check_election)) != 0)
- __db_err(dbenv, ret, "can't signal election thread");
- }
- return (ret);
-}
-
-/*
- * !!!
- * Caller holds mutex.
- */
-static int
-start_election_thread(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- REPMGR_RUNNABLE *elector;
- int ret;
-
- db_rep = dbenv->rep_handle;
-
- if ((ret = __os_malloc(dbenv, sizeof(REPMGR_RUNNABLE), &elector))
- != 0)
- return (ret);
- elector->dbenv = dbenv;
- elector->run = __repmgr_elect_thread;
-
- if ((ret = __repmgr_thread_start(dbenv, elector)) == 0)
- db_rep->elect_thread = elector;
- else
- __os_free(dbenv, elector);
-
- return (ret);
-}
-
-static void *
-__repmgr_elect_thread(args)
- void *args;
-{
- DB_ENV *dbenv = args;
- int ret;
-
- RPRINT(dbenv, (dbenv, "starting election thread"));
-
- if ((ret = __repmgr_elect_main(dbenv)) != 0) {
- __db_err(dbenv, ret, "election thread failed");
- __repmgr_thread_failure(dbenv, ret);
- }
-
- RPRINT(dbenv, (dbenv, "election thread is exiting"));
- return (NULL);
-}
-
-static int
-__repmgr_elect_main(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- DBT my_addr;
-#ifdef DB_WIN32
- DWORD duration;
-#else
- struct timespec deadline;
-#endif
- u_int nsites, nvotes;
- int done, failure_recovery, last_op;
- int need_success, ret, succeeded, to_do;
-
- COMPQUIET(need_success, TRUE);
-
- db_rep = dbenv->rep_handle;
- last_op = 0;
- failure_recovery = succeeded = FALSE;
-
- /*
- * db_rep->operation_needed is the mechanism by which the outside world
- * (running in a different thread) tells us what it wants us to do. It
- * is obviously relevant when we're just starting up. But it can also
- * be set if a subsequent request for us to do something occurs while
- * we're still looping.
- *
- * ELECT_FAILURE_ELECTION asks us to start by doing an election, but to
- * do so in failure recovery mode. This failure recovery mode may
- * persist through several loop iterations: as long as it takes us to
- * succeed in finding a master, or until we get asked to perform a new
- * request. Thus the time for mapping ELECT_FAILURE_ELECTION to the
- * internal ELECT_ELECTION, as well as the setting of the failure
- * recovery flag, is at the point we receive the new request from
- * operation_needed (either here, or within the loop below).
- */
- LOCK_MUTEX(db_rep->mutex);
- if (db_rep->finished) {
- db_rep->elect_thread->finished = TRUE;
- UNLOCK_MUTEX(db_rep->mutex);
- return (0);
- }
- to_do = db_rep->operation_needed;
- db_rep->operation_needed = 0;
- UNLOCK_MUTEX(db_rep->mutex);
-
- /*
- * The way we are invoked determines the criterion for completion (which
- * is represented as "need_success"): if we've been asked to do an
- * election, we're only "done" when an election has actually succeeded.
- * If we're just here trying to find the master initially, then merely
- * getting a valid master_eid suffices.
- */
- switch (to_do) {
- case ELECT_FAILURE_ELECTION:
- failure_recovery = TRUE;
- to_do = ELECT_ELECTION;
- /* FALLTHROUGH */
- case ELECT_ELECTION:
- need_success = TRUE;
- break;
- case ELECT_SEEK_MASTER:
- to_do = 0; /* Caller has already called rep_start. */
- /* FALLTHROUGH */
- case ELECT_REPSTART:
- need_success = FALSE;
- break;
- default:
- DB_ASSERT(dbenv, FALSE);
- }
- /* Here, need_success has been initialized. */
-
- for (;;) {
- RPRINT(dbenv, (dbenv, "elect thread to do: %d", to_do));
- switch (to_do) {
- case ELECT_ELECTION:
- nsites = __repmgr_get_nsites(db_rep);
-
- nvotes = ELECTION_MAJORITY(nsites);
-
- /*
- * If we're doing an election because we noticed that
- * the master failed, it's reasonable to expect that the
- * master won't participate. By not waiting for its
- * vote, we can probably complete the election faster.
- * But note that we shouldn't allow this to affect
- * nvotes calculation.
- */
- if (failure_recovery) {
- nsites--;
-
- if (nsites == 1) {
- /*
- * We've just lost the only other site
- * in the group, so there's no point in
- * holding an election.
- */
- if ((ret = __repmgr_become_master(
- dbenv)) != 0)
- return (ret);
- break;
- }
- }
-
- switch (ret = __rep_elect(dbenv,
- (int)nsites, (int)nvotes, 0)) {
- case DB_REP_UNAVAIL:
- break;
-
- case 0:
- succeeded = TRUE;
- if (db_rep->takeover_pending) {
- db_rep->takeover_pending = FALSE;
- if ((ret =
- __repmgr_become_master(dbenv)) != 0)
- return (ret);
- }
- break;
-
- default:
- __db_err(
- dbenv, ret, "unexpected election failure");
- return (ret);
- }
- last_op = ELECT_ELECTION;
- break;
- case ELECT_REPSTART:
- if ((ret =
- __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0)
- return (ret);
- ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT);
- __os_free(dbenv, my_addr.data);
- if (ret != 0) {
- __db_err(dbenv, ret, "rep_start");
- return (ret);
- }
- last_op = ELECT_REPSTART;
- break;
- case 0:
- /*
- * Nothing to do: this can happen the first time
- * through, on initialization.
- */
- last_op = 0;
- break;
- default:
- DB_ASSERT(dbenv, FALSE);
- }
-
- LOCK_MUTEX(db_rep->mutex);
- while (!succeeded && !__repmgr_is_ready(dbenv)) {
-#ifdef DB_WIN32
- duration = db_rep->election_retry_wait / US_PER_MS;
- ret = SignalObjectAndWait(db_rep->mutex,
- db_rep->check_election, duration, FALSE);
- LOCK_MUTEX(db_rep->mutex);
- if (ret == WAIT_TIMEOUT)
- break;
- DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
-#else
- __repmgr_compute_wait_deadline(dbenv, &deadline,
- db_rep->election_retry_wait);
- if ((ret = pthread_cond_timedwait(
- &db_rep->check_election, &db_rep->mutex, &deadline))
- == ETIMEDOUT)
- break;
- DB_ASSERT(dbenv, ret == 0);
-#endif
- }
-
- /*
- * Ways we can get here: election succeeded, sleep duration
- * expired, "operation needed", or thread shut-down command.
- *
- * If we're not yet done, figure out what to do next (which may
- * be trivially easy if we've been told explicitly, via the
- * "operation needed" flag). We must first check if we've been
- * told to do a specific operation, because that could make our
- * completion criterion more stringent. Note that we never
- * lessen our completion criterion (i.e., unlike the initial
- * case, we may leave need_success untouched here).
- */
- done = FALSE;
- if ((to_do = db_rep->operation_needed) != 0) {
- db_rep->operation_needed = 0;
- switch (to_do) {
- case ELECT_FAILURE_ELECTION:
- failure_recovery = TRUE;
- to_do = ELECT_ELECTION;
- /* FALLTHROUGH */
- case ELECT_ELECTION:
- need_success = TRUE;
- break;
- case ELECT_SEEK_MASTER:
- to_do = 0;
- break;
- default:
- break;
- }
- } else if ((done = (succeeded ||
- (!need_success && IS_VALID_EID(db_rep->master_eid)) ||
- db_rep->finished)))
- db_rep->elect_thread->finished = TRUE;
- else {
- if (last_op == ELECT_ELECTION)
- to_do = ELECT_REPSTART;
- else {
- /*
- * Generally, if what we previously did is a
- * rep_start (or nothing, which really just
- * means another thread did the rep_start before
- * turning us on), then we next do an election.
- * However, with the REP_CLIENT init policy we
- * never do an initial election.
- */
- to_do = ELECT_ELECTION;
- if (db_rep->init_policy == DB_REP_CLIENT &&
- !db_rep->found_master)
- to_do = ELECT_REPSTART;
- }
- }
-
- UNLOCK_MUTEX(db_rep->mutex);
- if (done)
- return (0);
- }
-}
-
-/*
- * Tests whether another thread has signalled for our attention.
- */
-static int
-__repmgr_is_ready(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
-
- db_rep = dbenv->rep_handle;
-
- RPRINT(dbenv, (dbenv,
- "repmgr elect: opcode %d, finished %d, master %d",
- db_rep->operation_needed, db_rep->finished, db_rep->master_eid));
-
- return (db_rep->operation_needed || db_rep->finished);
-}
-
-/*
- * PUBLIC: int __repmgr_become_master __P((DB_ENV *));
- */
-int
-__repmgr_become_master(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- DBT my_addr;
- int ret;
-
- db_rep = dbenv->rep_handle;
- db_rep->master_eid = SELF_EID;
- db_rep->found_master = TRUE;
-
- /*
- * At the moment, it's useless to pass my address to rep_start here,
- * because rep_start ignores it in the case of MASTER. So we could
- * avoid the trouble of allocating and freeing this memory. But might
- * this conceivably change in the future?
- */
- if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0)
- return (ret);
- ret = __rep_start(dbenv, &my_addr, DB_REP_MASTER);
- __os_free(dbenv, my_addr.data);
- __repmgr_stash_generation(dbenv);
-
- return (ret);
-}
diff --git a/db/repmgr/repmgr_method.c b/db/repmgr/repmgr_method.c
deleted file mode 100644
index 723bde7e7..000000000
--- a/db/repmgr/repmgr_method.c
+++ /dev/null
@@ -1,421 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2005,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_method.c,v 1.38 2007/06/11 18:29:34 alanb Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#include "db_int.h"
-
-static int __repmgr_await_threads __P((DB_ENV *));
-
-/*
- * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t));
- */
-int
-__repmgr_start(dbenv, nthreads, flags)
- DB_ENV *dbenv;
- int nthreads;
- u_int32_t flags;
-{
- DB_REP *db_rep;
- DBT my_addr;
- REPMGR_RUNNABLE *selector, *messenger;
- int ret, i;
-
- db_rep = dbenv->rep_handle;
-
- /* Check that the required initialization has been done. */
- if (db_rep->my_addr.port == 0) {
- __db_errx(dbenv,
- "repmgr_set_local_site must be called before repmgr_start");
- return (EINVAL);
- }
-
- if (db_rep->selector != NULL || db_rep->finished) {
- __db_errx(dbenv,
- "DB_ENV->repmgr_start may not be called more than once");
- return (EINVAL);
- }
-
- switch (flags) {
- case DB_REP_CLIENT:
- case DB_REP_ELECTION:
- case DB_REP_MASTER:
- break;
- default:
- __db_errx(dbenv,
- "repmgr_start: unrecognized flags parameter value");
- return (EINVAL);
- }
-
- if (nthreads <= 0) {
- __db_errx(dbenv,
- "repmgr_start: nthreads parameter must be >= 1");
- return (EINVAL);
- }
-
- if ((ret =
- __os_calloc(dbenv, (u_int)nthreads, sizeof(REPMGR_RUNNABLE *),
- &db_rep->messengers)) != 0)
- return (ret);
- db_rep->nthreads = nthreads;
-
- if ((ret = __repmgr_net_init(dbenv, db_rep)) != 0 ||
- (ret = __repmgr_init_sync(dbenv, db_rep)) != 0 ||
- (ret = __rep_set_transport(dbenv, SELF_EID, __repmgr_send)) != 0)
- return (ret);
-
- /*
- * Make some sort of call to rep_start before starting other threads, to
- * ensure that incoming messages being processed always have a rep
- * context properly configured.
- */
- if ((db_rep->init_policy = flags) == DB_REP_MASTER)
- ret = __repmgr_become_master(dbenv);
- else {
- if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0)
- return (ret);
- ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT);
- __os_free(dbenv, my_addr.data);
- if (ret == 0) {
- LOCK_MUTEX(db_rep->mutex);
- ret = __repmgr_init_election(dbenv, ELECT_SEEK_MASTER);
- UNLOCK_MUTEX(db_rep->mutex);
- }
- }
- if (ret != 0)
- return (ret);
-
- if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_RUNNABLE), &selector))
- != 0)
- return (ret);
- selector->dbenv = dbenv;
- selector->run = __repmgr_select_thread;
- if ((ret = __repmgr_thread_start(dbenv, selector)) != 0) {
- __db_err(dbenv, ret, "can't start selector thread");
- __os_free(dbenv, selector);
- return (ret);
- } else
- db_rep->selector = selector;
-
- for (i=0; i<nthreads; i++) {
- if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_RUNNABLE),
- &messenger)) != 0)
- return (ret);
-
- messenger->dbenv = dbenv;
- messenger->run = __repmgr_msg_thread;
- if ((ret = __repmgr_thread_start(dbenv, messenger)) != 0) {
- __os_free(dbenv, messenger);
- return (ret);
- }
- db_rep->messengers[i] = messenger;
- }
-
- return (ret);
-}
-
-/*
- * PUBLIC: int __repmgr_close __P((DB_ENV *));
- */
-int
-__repmgr_close(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- int ret, t_ret;
-
- ret = 0;
- db_rep = dbenv->rep_handle;
- if (db_rep->selector != NULL) {
- RPRINT(dbenv, (dbenv, "Stopping repmgr threads"));
- ret = __repmgr_stop_threads(dbenv);
- if ((t_ret = __repmgr_await_threads(dbenv)) != 0 && ret == 0)
- ret = t_ret;
- RPRINT(dbenv, (dbenv, "Repmgr threads are finished"));
- }
-
- if ((t_ret = __repmgr_net_close(dbenv)) != 0 && ret == 0)
- ret = t_ret;
-
- if ((t_ret = __repmgr_close_sync(dbenv)) != 0 && ret == 0)
- ret = t_ret;
-
- return (ret);
-}
-
-/*
- * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int));
- */
-int
-__repmgr_set_ack_policy(dbenv, policy)
- DB_ENV *dbenv;
- int policy;
-{
- switch (policy) {
- case DB_REPMGR_ACKS_ALL: /* FALLTHROUGH */
- case DB_REPMGR_ACKS_ALL_PEERS: /* FALLTHROUGH */
- case DB_REPMGR_ACKS_NONE: /* FALLTHROUGH */
- case DB_REPMGR_ACKS_ONE: /* FALLTHROUGH */
- case DB_REPMGR_ACKS_ONE_PEER: /* FALLTHROUGH */
- case DB_REPMGR_ACKS_QUORUM:
- dbenv->rep_handle->perm_policy = policy;
- return (0);
- default:
- __db_errx(dbenv,
- "Unknown ack_policy in DB_ENV->repmgr_set_ack_policy");
- return (EINVAL);
- }
-}
-
-/*
- * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *));
- */
-int
-__repmgr_get_ack_policy(dbenv, policy)
- DB_ENV *dbenv;
- int *policy;
-{
- *policy = dbenv->rep_handle->perm_policy;
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_env_create __P((DB_ENV *, DB_REP *));
- */
-int
-__repmgr_env_create(dbenv, db_rep)
- DB_ENV *dbenv;
- DB_REP *db_rep;
-{
- int ret;
-
- /* Set some default values. */
- db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */
- db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */
- db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */
- db_rep->config_nsites = 0;
- db_rep->peer = DB_EID_INVALID;
- db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
-
-#ifdef DB_WIN32
- db_rep->waiters = NULL;
-#else
- db_rep->read_pipe = db_rep->write_pipe = -1;
-#endif
- if ((ret = __repmgr_net_create(dbenv, db_rep)) == 0)
- ret = __repmgr_queue_create(dbenv, db_rep);
-
- return (ret);
-}
-
-/*
- * PUBLIC: void __repmgr_env_destroy __P((DB_ENV *, DB_REP *));
- */
-void
-__repmgr_env_destroy(dbenv, db_rep)
- DB_ENV *dbenv;
- DB_REP *db_rep;
-{
- __repmgr_queue_destroy(dbenv);
- __repmgr_net_destroy(dbenv, db_rep);
- if (db_rep->messengers != NULL) {
- __os_free(dbenv, db_rep->messengers);
- db_rep->messengers = NULL;
- }
-}
-
-/*
- * PUBLIC: int __repmgr_stop_threads __P((DB_ENV *));
- */
-int
-__repmgr_stop_threads(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- int ret;
-
- db_rep = dbenv->rep_handle;
-
- /*
- * Hold mutex for the purpose of waking up threads, but then get out of
- * the way to let them clean up and exit.
- */
- LOCK_MUTEX(db_rep->mutex);
- db_rep->finished = TRUE;
- if (db_rep->elect_thread != NULL &&
- (ret = __repmgr_signal(&db_rep->check_election)) != 0)
- goto unlock;
-
- if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
- goto unlock;
- UNLOCK_MUTEX(db_rep->mutex);
-
- return (__repmgr_wake_main_thread(dbenv));
-
-unlock:
- UNLOCK_MUTEX(db_rep->mutex);
- return (ret);
-}
-
-static int
-__repmgr_await_threads(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- REPMGR_RUNNABLE *messenger;
- int ret, t_ret, i;
-
- db_rep = dbenv->rep_handle;
- ret = 0;
- if (db_rep->elect_thread != NULL) {
- ret = __repmgr_thread_join(db_rep->elect_thread);
- __os_free(dbenv, db_rep->elect_thread);
- db_rep->elect_thread = NULL;
- }
-
- for (i=0; i<db_rep->nthreads && db_rep->messengers[i] != NULL; i++) {
- messenger = db_rep->messengers[i];
- if ((t_ret = __repmgr_thread_join(messenger)) != 0 && ret == 0)
- ret = t_ret;
- __os_free(dbenv, messenger);
- db_rep->messengers[i] = NULL; /* necessary? */
- }
- __os_free(dbenv, db_rep->messengers);
- db_rep->messengers = NULL;
-
- if (db_rep->selector != NULL) {
- if ((t_ret = __repmgr_thread_join(db_rep->selector)) != 0 &&
- ret == 0)
- ret = t_ret;
- __os_free(dbenv, db_rep->selector);
- db_rep->selector = NULL;
- }
-
- return (ret);
-}
-
-/*
- * PUBLIC: int __repmgr_set_local_site __P((DB_ENV *, const char *, u_int,
- * PUBLIC: u_int32_t));
- */
-int
-__repmgr_set_local_site(dbenv, host, port, flags)
- DB_ENV *dbenv;
- const char *host;
- u_int port;
- u_int32_t flags;
-{
- ADDRINFO *address_list;
- DB_REP *db_rep;
- repmgr_netaddr_t addr;
- int locked, ret;
- const char *sharable_host;
- char buffer[MAXHOSTNAMELEN];
-
- if (flags != 0)
- return (__db_ferr(dbenv, "DB_ENV->repmgr_set_local_site", 0));
-
- db_rep = dbenv->rep_handle;
- if (db_rep->my_addr.port != 0) {
- __db_errx(dbenv, "Listen address already set");
- return (EINVAL);
- }
-
- /*
- * If we haven't been given a sharable local host name, we must get one
- * for the purpose of sharing with our friends, but it's ok to use for
- * the address look-up. "Helpfully" converting special names such as
- * "localhost" in this same way is tempting, but in practice turns out
- * to be too tricky.
- */
- if (host == NULL) {
- if ((ret = gethostname(buffer, sizeof(buffer))) != 0)
- return (net_errno);
-
- /* In case truncation leaves no terminating NUL byte: */
- buffer[sizeof(buffer) - 1] = '\0';
- sharable_host = buffer;
- } else
- sharable_host = host;
-
- if ((ret = __repmgr_getaddr(dbenv,
- host, port, AI_PASSIVE, &address_list)) != 0)
- return (ret);
-
- if ((ret = __repmgr_pack_netaddr(dbenv,
- sharable_host, port, address_list, &addr)) != 0) {
- __db_freeaddrinfo(dbenv, address_list);
- return (ret);
- }
-
- if (REPMGR_SYNC_INITED(db_rep)) {
- LOCK_MUTEX(db_rep->mutex);
- locked = TRUE;
- } else
- locked = FALSE;
-
- memcpy(&db_rep->my_addr, &addr, sizeof(addr));
-
- if (locked)
- UNLOCK_MUTEX(db_rep->mutex);
- return (0);
-}
-
-/*
- * If the application only calls this method from a single thread (e.g., during
- * its initialization), it will avoid the problems with the non-thread-safe host
- * name lookup. In any case, if we relegate the blocking lookup to here it
- * won't affect our select() loop.
- *
- * PUBLIC: int __repmgr_add_remote_site __P((DB_ENV *, const char *, u_int,
- * PUBLIC: int *, u_int32_t));
- */
-int
-__repmgr_add_remote_site(dbenv, host, port, eidp, flags)
- DB_ENV *dbenv;
- const char *host;
- u_int port;
- int *eidp;
- u_int32_t flags;
-{
- DB_REP *db_rep;
- REPMGR_SITE *site;
- int eid, locked, ret;
-
- if ((ret = __db_fchk(dbenv,
- "DB_ENV->repmgr_add_remote_site", flags, DB_REPMGR_PEER)) != 0)
- return (ret);
-
- if (host == NULL) {
- __db_errx(dbenv,
- "repmgr_add_remote_site: host name is required");
- return (EINVAL);
- }
-
- db_rep = dbenv->rep_handle;
-
- if (REPMGR_SYNC_INITED(db_rep)) {
- LOCK_MUTEX(db_rep->mutex);
- locked = TRUE;
- } else
- locked = FALSE;
-
- if ((ret = __repmgr_add_site(dbenv, host, port, &site)) != 0)
- goto unlock;
- eid = EID_FROM_SITE(site);
-
- if (LF_ISSET(DB_REPMGR_PEER))
- db_rep->peer = eid;
- if (eidp != NULL)
- *eidp = eid;
-
-unlock: if (locked)
- UNLOCK_MUTEX(db_rep->mutex);
- return (ret);
-}
diff --git a/db/repmgr/repmgr_msg.c b/db/repmgr/repmgr_msg.c
deleted file mode 100644
index 6b70bbbe1..000000000
--- a/db/repmgr/repmgr_msg.c
+++ /dev/null
@@ -1,341 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2005,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_msg.c,v 1.36 2007/06/11 18:29:34 alanb Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#include "db_int.h"
-
-static int message_loop __P((DB_ENV *));
-static int process_message __P((DB_ENV*, DBT*, DBT*, int));
-static int handle_newsite __P((DB_ENV *, const DBT *));
-static int ack_message __P((DB_ENV *, u_int32_t, DB_LSN *));
-
-/*
- * PUBLIC: void *__repmgr_msg_thread __P((void *));
- */
-void *
-__repmgr_msg_thread(args)
- void *args;
-{
- DB_ENV *dbenv = args;
- int ret;
-
- if ((ret = message_loop(dbenv)) != 0) {
- __db_err(dbenv, ret, "message thread failed");
- __repmgr_thread_failure(dbenv, ret);
- }
- return (NULL);
-}
-
-static int
-message_loop(dbenv)
- DB_ENV *dbenv;
-{
- REPMGR_MESSAGE *msg;
- int ret;
-
- while ((ret = __repmgr_queue_get(dbenv, &msg)) == 0) {
- while ((ret = process_message(dbenv, &msg->control, &msg->rec,
- msg->originating_eid)) == DB_LOCK_DEADLOCK)
- RPRINT(dbenv, (dbenv, "repmgr deadlock retry"));
-
- __os_free(dbenv, msg);
- if (ret != 0)
- return (ret);
- }
-
- return (ret == DB_REP_UNAVAIL ? 0 : ret);
-}
-
-static int
-process_message(dbenv, control, rec, eid)
- DB_ENV *dbenv;
- DBT *control, *rec;
- int eid;
-{
- DB_REP *db_rep;
- REP *rep;
- DB_LSN permlsn;
- int ret;
- u_int32_t generation;
-
- db_rep = dbenv->rep_handle;
-
- /*
- * Save initial generation number, in case it changes in a close race
- * with a NEWMASTER. See msgdir.10000/10039/msg00086.html.
- */
- generation = db_rep->generation;
-
- switch (ret =
- __rep_process_message(dbenv, control, rec, eid, &permlsn)) {
- case 0:
- if (db_rep->takeover_pending) {
- db_rep->takeover_pending = FALSE;
- return (__repmgr_become_master(dbenv));
- }
- break;
-
- case DB_REP_NEWSITE:
- return (handle_newsite(dbenv, rec));
-
- case DB_REP_HOLDELECTION:
- LOCK_MUTEX(db_rep->mutex);
- ret = __repmgr_init_election(dbenv, ELECT_ELECTION);
- UNLOCK_MUTEX(db_rep->mutex);
- if (ret != 0)
- return (ret);
- break;
-
- case DB_REP_DUPMASTER:
- if ((ret = __repmgr_repstart(dbenv, DB_REP_CLIENT)) != 0)
- return (ret);
- LOCK_MUTEX(db_rep->mutex);
- ret = __repmgr_init_election(dbenv, ELECT_ELECTION);
- UNLOCK_MUTEX(db_rep->mutex);
- if (ret != 0)
- return (ret);
- break;
-
- case DB_REP_ISPERM:
- /*
- * Don't bother sending ack if master doesn't care about it.
- */
- rep = db_rep->region;
- if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE ||
- (IS_PEER_POLICY(db_rep->perm_policy) &&
- rep->priority == 0))
- break;
-
- if ((ret = ack_message(dbenv, generation, &permlsn)) != 0)
- return (ret);
-
- break;
-
- case DB_REP_NOTPERM: /* FALLTHROUGH */
- case DB_REP_IGNORE: /* FALLTHROUGH */
- case DB_LOCK_DEADLOCK:
- break;
-
- default:
- __db_err(dbenv, ret, "DB_ENV->rep_process_message");
- return (ret);
- }
- return (0);
-}
-
-/*
- * Handle replication-related events. Returns only 0 or DB_EVENT_NOT_HANDLED;
- * no other error returns are tolerated.
- *
- * PUBLIC: int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
- */
-int
-__repmgr_handle_event(dbenv, event, info)
- DB_ENV *dbenv;
- u_int32_t event;
- void *info;
-{
- DB_REP *db_rep;
-
- db_rep = dbenv->rep_handle;
-
- if (db_rep->selector == NULL) {
- /* Repmgr is not in use, so all events go to application. */
- return (DB_EVENT_NOT_HANDLED);
- }
-
- switch (event) {
- case DB_EVENT_REP_ELECTED:
- DB_ASSERT(dbenv, info == NULL);
-
- db_rep->found_master = TRUE;
- db_rep->takeover_pending = TRUE;
-
- /*
- * The application doesn't really need to see this, because the
- * purpose of this event is to tell the winning site that it
- * should call rep_start(MASTER), and in repmgr we do that
- * automatically. Still, they could conceivably be curious, and
- * it doesn't hurt anything to let them know.
- */
- break;
- case DB_EVENT_REP_NEWMASTER:
- DB_ASSERT(dbenv, info != NULL);
-
- db_rep->found_master = TRUE;
- db_rep->master_eid = *(int *)info;
- __repmgr_stash_generation(dbenv);
-
- /* Application still needs to see this. */
- break;
- default:
- break;
- }
- return (DB_EVENT_NOT_HANDLED);
-}
-
-/*
- * Acknowledges a message.
- *
- * !!!
- * Note that this cannot be called from the select() thread, in case we call
- * __repmgr_bust_connection(..., FALSE).
- */
-static int
-ack_message(dbenv, generation, lsn)
- DB_ENV *dbenv;
- u_int32_t generation;
- DB_LSN *lsn;
-{
- DB_REP *db_rep;
- REPMGR_SITE *site;
- REPMGR_CONNECTION *conn;
- DB_REPMGR_ACK ack;
- DBT control2, rec2;
- int ret;
-
- db_rep = dbenv->rep_handle;
- /*
- * Regardless of where a message came from, all ack's go to the master
- * site. If we're not in touch with the master, we drop it, since
- * there's not much else we can do.
- */
- if (!IS_VALID_EID(db_rep->master_eid) ||
- db_rep->master_eid == SELF_EID) {
- RPRINT(dbenv, (dbenv,
- "dropping ack with master %d", db_rep->master_eid));
- return (0);
- }
-
- ret = 0;
- LOCK_MUTEX(db_rep->mutex);
- site = SITE_FROM_EID(db_rep->master_eid);
- if (site->state == SITE_CONNECTED &&
- !F_ISSET(site->ref.conn, CONN_CONNECTING)) {
-
- ack.generation = generation;
- memcpy(&ack.lsn, lsn, sizeof(DB_LSN));
- control2.data = &ack;
- control2.size = sizeof(ack);
- rec2.size = 0;
-
- conn = site->ref.conn;
- if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
- &control2, &rec2)) == DB_REP_UNAVAIL)
- ret = __repmgr_bust_connection(dbenv, conn, FALSE);
- }
-
- UNLOCK_MUTEX(db_rep->mutex);
- return (ret);
-}
-
-/*
- * Does everything necessary to handle the processing of a NEWSITE return.
- */
-static int
-handle_newsite(dbenv, rec)
- DB_ENV *dbenv;
- const DBT *rec;
-{
- ADDRINFO *ai;
- DB_REP *db_rep;
- REPMGR_SITE *site;
- SITE_STRING_BUFFER buffer;
- repmgr_netaddr_t *addr;
- size_t hlen;
- u_int16_t port;
- int ret;
- char *host;
-
- db_rep = dbenv->rep_handle;
- /*
- * Check if we got sent connect information and if we did, if
- * this is me or if we already have a connection to this new
- * site. If we don't, establish a new one.
- *
- * Unmarshall the cdata: a 2-byte port number, in network byte order,
- * followed by the host name string, which should already be
- * null-terminated, but let's make sure.
- */
- if (rec->size < sizeof(port) + 1) {
- __db_errx(dbenv, "unexpected cdata size, msg ignored");
- return (0);
- }
- memcpy(&port, rec->data, sizeof(port));
- port = ntohs(port);
-
- host = (char*)((u_int8_t*)rec->data + sizeof(port));
- hlen = (rec->size - sizeof(port)) - 1;
- host[hlen] = '\0';
-
- /* It's me, do nothing. */
- if (strcmp(host, db_rep->my_addr.host) == 0 &&
- port == db_rep->my_addr.port) {
- RPRINT(dbenv, (dbenv, "repmgr ignores own NEWSITE info"));
- return (0);
- }
-
- LOCK_MUTEX(db_rep->mutex);
- if ((ret = __repmgr_add_site(dbenv, host, port, &site)) == EEXIST) {
- RPRINT(dbenv, (dbenv,
- "NEWSITE info from %s was already known",
- __repmgr_format_site_loc(site, buffer)));
- /*
- * In case we already know about this site only because it
- * first connected to us, we may not yet have had a chance to
- * look up its addresses. Even though we don't need them just
- * now, this is an advantageous opportunity to get them since we
- * can do so away from the critical select thread. Give up only
- * for a disastrous failure.
- */
- addr = &site->net_addr;
- if (addr->address_list == NULL) {
- if ((ret = __repmgr_getaddr(dbenv,
- addr->host, addr->port, 0, &ai)) == 0)
- addr->address_list = ai;
- else if (ret != DB_REP_UNAVAIL)
- goto unlock;
- }
-
- ret = 0;
- if (site->state == SITE_CONNECTED)
- goto unlock; /* Nothing to do. */
- } else {
- if (ret != 0)
- goto unlock;
- RPRINT(dbenv, (dbenv, "NEWSITE info added %s",
- __repmgr_format_site_loc(site, buffer)));
- }
-
- /*
- * Wake up the main thread to connect to the new or reawakened
- * site.
- */
- ret = __repmgr_wake_main_thread(dbenv);
-
-unlock: UNLOCK_MUTEX(db_rep->mutex);
- return (ret);
-}
-
-/*
- * PUBLIC: void __repmgr_stash_generation __P((DB_ENV *));
- */
-void
-__repmgr_stash_generation(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- REP *rep;
-
- db_rep = dbenv->rep_handle;
- rep = db_rep->region;
-
- db_rep->generation = rep->gen;
-}
diff --git a/db/repmgr/repmgr_net.c b/db/repmgr/repmgr_net.c
deleted file mode 100644
index 95fdeee1f..000000000
--- a/db/repmgr/repmgr_net.c
+++ /dev/null
@@ -1,1075 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2005,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_net.c,v 1.55 2007/06/11 18:29:34 alanb Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#include "db_int.h"
-#include "dbinc/mp.h"
-
-/*
- * The functions in this module implement a simple wire protocol for
- * transmitting messages, both replication messages and our own internal control
- * messages. The protocol is as follows:
- *
- * 1 byte - message type (defined in repmgr_int.h)
- * 4 bytes - size of control
- * 4 bytes - size of rec
- * ? bytes - control
- * ? bytes - rec
- *
- * where both sizes are 32-bit binary integers in network byte order.
- * Either control or rec can have zero length, but even in this case the
- * 4-byte length will be present.
- * Putting both lengths right up at the front allows us to read in fewer
- * phases, and allows us to allocate buffer space for both parts (plus a wrapper
- * struct) at once.
- */
-
-/*
- * In sending a message, we first try to send it in-line, in the sending thread,
- * and without first copying the message, by using scatter/gather I/O, using
- * iovecs to point to the various pieces of the message. If that all works
- * without blocking, that's optimal.
- * If we find that, for a particular connection, we can't send without
- * blocking, then we must copy the message for sending later in the select()
- * thread. In the course of doing that, we might as well "flatten" the message,
- * forming one single buffer, to simplify life. Not only that, once we've gone
- * to the trouble of doing that, other sites to which we also want to send the
- * message (in the case of a broadcast), may as well take advantage of the
- * simplified structure also.
- * This structure holds it all. Note that this structure, and the
- * "flat_msg" structure, are allocated separately, because (1) the flat_msg
- * version is usually not needed; and (2) when it is needed, it will need to
- * live longer than the wrapping sending_msg structure.
- * Note that, for the broadcast case, where we're going to use this
- * repeatedly, the iovecs is a template that must be copied, since in normal use
- * the iovecs pointers and lengths get adjusted after every partial write.
- */
-struct sending_msg {
- REPMGR_IOVECS iovecs;
- u_int8_t type;
- u_int32_t control_size_buf, rec_size_buf;
- REPMGR_FLAT *fmsg;
-};
-
-static int __repmgr_send_broadcast
- __P((DB_ENV *, const DBT *, const DBT *, u_int *, u_int *));
-static void setup_sending_msg
- __P((struct sending_msg *, u_int, const DBT *, const DBT *));
-static int __repmgr_send_internal
- __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
-static int enqueue_msg
- __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
-static int flatten __P((DB_ENV *, struct sending_msg *));
-static REPMGR_SITE *__repmgr_available_site __P((DB_ENV *, int));
-
-/*
- * __repmgr_send --
- * The send function for DB_ENV->rep_set_transport.
- *
- * !!!
- * This is only ever called as the replication transport call-back, which means
- * it's either on one of our message processing threads or an application
- * thread. It mustn't be called from the select() thread, because we might call
- * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
- * select() thread.
- *
- * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
- * PUBLIC: const DB_LSN *, int, u_int32_t));
- */
-int
-__repmgr_send(dbenv, control, rec, lsnp, eid, flags)
- DB_ENV *dbenv;
- const DBT *control, *rec;
- const DB_LSN *lsnp;
- int eid;
- u_int32_t flags;
-{
- DB_REP *db_rep;
- u_int nsites, npeers, available, needed;
- int ret, t_ret;
- REPMGR_SITE *site;
- REPMGR_CONNECTION *conn;
-
- db_rep = dbenv->rep_handle;
-
- LOCK_MUTEX(db_rep->mutex);
- if (eid == DB_EID_BROADCAST) {
- if ((ret = __repmgr_send_broadcast(dbenv, control, rec,
- &nsites, &npeers)) != 0)
- goto out;
- } else {
- /*
- * If this is a request that can be sent anywhere, then see if
- * we can send it to our peer (to save load on the master), but
- * not if it's a rerequest, 'cuz that likely means we tried this
- * already and failed.
- */
- if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) ==
- DB_REP_ANYWHERE &&
- IS_VALID_EID(db_rep->peer) &&
- (site = __repmgr_available_site(dbenv, db_rep->peer)) !=
- NULL) {
- RPRINT(dbenv, (dbenv, "sending request to peer"));
- } else if ((site = __repmgr_available_site(dbenv, eid)) ==
- NULL) {
- RPRINT(dbenv, (dbenv,
- "ignoring message sent to unavailable site"));
- ret = DB_REP_UNAVAIL;
- goto out;
- }
-
- conn = site->ref.conn;
- if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
- control, rec)) == DB_REP_UNAVAIL &&
- (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
- ret = t_ret;
- if (ret != 0)
- goto out;
-
- nsites = 1;
- npeers = site->priority > 0 ? 1 : 0;
- }
- /*
- * Right now, nsites and npeers represent the (maximum) number of sites
- * we've attempted to begin sending the message to. Of course we
- * haven't really received any ack's yet. But since we've only sent to
- * nsites/npeers other sites, that's the maximum number of ack's we
- * could possibly expect. If even that number fails to satisfy our PERM
- * policy, there's no point waiting for something that will never
- * happen.
- */
- if (LF_ISSET(DB_REP_PERMANENT)) {
- switch (db_rep->perm_policy) {
- case DB_REPMGR_ACKS_NONE:
- goto out;
-
- case DB_REPMGR_ACKS_ONE:
- needed = 1;
- available = nsites;
- break;
-
- case DB_REPMGR_ACKS_ALL:
- /* Number of sites in the group besides myself. */
- needed = __repmgr_get_nsites(db_rep) - 1;
- available = nsites;
- break;
-
- case DB_REPMGR_ACKS_ONE_PEER:
- needed = 1;
- available = npeers;
- break;
-
- case DB_REPMGR_ACKS_ALL_PEERS:
- /*
- * Too hard to figure out "needed", since we're not
- * keeping track of how many peers we have; so just skip
- * the optimization in this case.
- */
- needed = 1;
- available = npeers;
- break;
-
- case DB_REPMGR_ACKS_QUORUM:
- /*
- * The minimum number of acks necessary to ensure that
- * the transaction is durable if an election is held.
- */
- needed = (__repmgr_get_nsites(db_rep) - 1) / 2;
- available = npeers;
- break;
-
- default:
- COMPQUIET(available, 0);
- COMPQUIET(needed, 0);
- (void)__db_unknown_path(dbenv, "__repmgr_send");
- break;
- }
- if (available < needed) {
- ret = DB_REP_UNAVAIL;
- goto out;
- }
- /* In ALL_PEERS case, display of "needed" might be confusing. */
- RPRINT(dbenv, (dbenv,
- "will await acknowledgement: need %u", needed));
- ret = __repmgr_await_ack(dbenv, lsnp);
- }
-
-out: UNLOCK_MUTEX(db_rep->mutex);
- if (ret != 0 && LF_ISSET(DB_REP_PERMANENT)) {
- STAT(db_rep->region->mstat.st_perm_failed++);
- DB_EVENT(dbenv, DB_EVENT_REP_PERM_FAILED, NULL);
- }
- return (ret);
-}
-
-static REPMGR_SITE *
-__repmgr_available_site(dbenv, eid)
- DB_ENV *dbenv;
- int eid;
-{
- DB_REP *db_rep;
- REPMGR_SITE *site;
-
- db_rep = dbenv->rep_handle;
- site = SITE_FROM_EID(eid);
- if (site->state != SITE_CONNECTED)
- return (NULL);
-
- if (F_ISSET(site->ref.conn, CONN_CONNECTING))
- return (NULL);
- return (site);
-}
-
-/*
- * Sends message to all sites with which we currently have an active
- * connection. Sets result parameters according to how many sites we attempted
- * to begin sending to, even if we did nothing more than queue it for later
- * delivery.
- *
- * !!!
- * Caller must hold dbenv->mutex.
- *
- * !!!
- * Note that this cannot be called from the select() thread, in case we call
- * __repmgr_bust_connection(..., FALSE).
- */
-static int
-__repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
- DB_ENV *dbenv;
- const DBT *control, *rec;
- u_int *nsitesp, *npeersp;
-{
- DB_REP *db_rep;
- struct sending_msg msg;
- REPMGR_CONNECTION *conn;
- REPMGR_SITE *site;
- u_int nsites, npeers;
- int ret;
-
- db_rep = dbenv->rep_handle;
-
- setup_sending_msg(&msg, REPMGR_REP_MESSAGE, control, rec);
- nsites = npeers = 0;
-
- /*
- * Traverse the connections list. Here, even in bust_connection, we
- * don't unlink the current list entry, so we can use the TAILQ_FOREACH
- * macro.
- */
- TAILQ_FOREACH(conn, &db_rep->connections, entries) {
- if (F_ISSET(conn, CONN_CONNECTING | CONN_DEFUNCT) ||
- !IS_VALID_EID(conn->eid))
- continue;
-
- if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
- site = SITE_FROM_EID(conn->eid);
- nsites++;
- if (site->priority > 0)
- npeers++;
- } else if (ret == DB_REP_UNAVAIL) {
- if ((ret = __repmgr_bust_connection(
- dbenv, conn, FALSE)) != 0)
- return (ret);
- } else
- return (ret);
- }
-
- *nsitesp = nsites;
- *npeersp = npeers;
- return (0);
-}
-
-/*
- * __repmgr_send_one --
- * Send a message to a site, or if you can't just yet, make a copy of it
- * and arrange to have it sent later. 'rec' may be NULL, in which case we send
- * a zero length and no data.
- *
- * If we get an error, we take care of cleaning up the connection (calling
- * __repmgr_bust_connection()), so that the caller needn't do so.
- *
- * !!!
- * Note that the mutex should be held through this call.
- * It doubles as a synchronizer to make sure that two threads don't
- * intersperse writes that are part of two single messages.
- *
- * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
- * PUBLIC: u_int, const DBT *, const DBT *));
- */
-int
-__repmgr_send_one(dbenv, conn, msg_type, control, rec)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
- u_int msg_type;
- const DBT *control, *rec;
-{
- struct sending_msg msg;
-
- setup_sending_msg(&msg, msg_type, control, rec);
- return (__repmgr_send_internal(dbenv, conn, &msg));
-}
-
-/*
- * Attempts a "best effort" to send a message on the given site. If there is an
- * excessive backlog of message already queued on the connection, we simply drop
- * this message, and still return 0 even in this case.
- */
-static int
-__repmgr_send_internal(dbenv, conn, msg)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
- struct sending_msg *msg;
-{
-#define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */
- REPMGR_IOVECS iovecs;
- SITE_STRING_BUFFER buffer;
- int ret;
- size_t nw;
- size_t total_written;
-
- DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
- if (!STAILQ_EMPTY(&conn->outbound_queue)) {
- /*
- * Output to this site is currently owned by the select()
- * thread, so we can't try sending in-line here. We can only
- * queue the msg for later.
- */
- RPRINT(dbenv, (dbenv, "msg to %s to be queued",
- __repmgr_format_eid_loc(dbenv->rep_handle,
- conn->eid, buffer)));
- if (conn->out_queue_length < OUT_QUEUE_LIMIT)
- return (enqueue_msg(dbenv, conn, msg, 0));
- else {
- RPRINT(dbenv, (dbenv, "queue limit exceeded"));
- STAT(dbenv->rep_handle->
- region->mstat.st_msgs_dropped++);
- return (0);
- }
- }
-
- /*
- * Send as much data to the site as we can, without blocking. Keep
- * writing as long as we're making some progress. Make a scratch copy
- * of iovecs for our use, since we destroy it in the process of
- * adjusting pointers after each partial I/O.
- */
- memcpy(&iovecs, &msg->iovecs, sizeof(iovecs));
- total_written = 0;
- while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset],
- iovecs.count-iovecs.offset, &nw)) == 0) {
- total_written += nw;
- if (__repmgr_update_consumed(&iovecs, nw)) /* all written */
- return (0);
- }
-
- if (ret != WOULDBLOCK) {
- __db_err(dbenv, ret, "socket writing failure");
- return (DB_REP_UNAVAIL);
- }
-
- RPRINT(dbenv, (dbenv, "wrote only %lu bytes to %s",
- (u_long)total_written,
- __repmgr_format_eid_loc(dbenv->rep_handle, conn->eid, buffer)));
- /*
- * We can't send any more without blocking: queue (a pointer to) a
- * "flattened" copy of the message, so that the select() thread will
- * finish sending it later.
- */
- if ((ret = enqueue_msg(dbenv, conn, msg, total_written)) != 0)
- return (ret);
-
- STAT(dbenv->rep_handle->region->mstat.st_msgs_queued++);
-
- /*
- * Wake the main select thread so that it can discover that it has
- * received ownership of this connection. Note that we didn't have to
- * do this in the previous case (above), because the non-empty queue
- * implies that the select() thread is already managing ownership of
- * this connection.
- */
-#ifdef DB_WIN32
- if (WSAEventSelect(conn->fd, conn->event_object,
- FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) {
- ret = net_errno;
- __db_err(dbenv, ret, "can't add FD_WRITE event bit");
- return (ret);
- }
-#endif
- return (__repmgr_wake_main_thread(dbenv));
-}
-
-/*
- * PUBLIC: int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
- *
- * Count up how many sites have ack'ed the given LSN. Returns TRUE if enough
- * sites have ack'ed; FALSE otherwise.
- *
- * !!!
- * Caller must hold the mutex.
- */
-int
-__repmgr_is_permanent(dbenv, lsnp)
- DB_ENV *dbenv;
- const DB_LSN *lsnp;
-{
- DB_REP *db_rep;
- REPMGR_SITE *site;
- u_int eid, nsites, npeers;
- int is_perm, has_missing_peer;
-
- db_rep = dbenv->rep_handle;
-
- if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE)
- return (TRUE);
-
- nsites = npeers = 0;
- has_missing_peer = FALSE;
- for (eid = 0; eid < db_rep->site_cnt; eid++) {
- site = SITE_FROM_EID(eid);
- if (site->priority == -1) {
- /*
- * Never connected to this site: since we can't know
- * whether it's a peer, assume the worst.
- */
- has_missing_peer = TRUE;
- continue;
- }
-
- if (log_compare(&site->max_ack, lsnp) >= 0) {
- nsites++;
- if (site->priority > 0)
- npeers++;
- } else {
- /* This site hasn't ack'ed the message. */
- if (site->priority > 0)
- has_missing_peer = TRUE;
- }
- }
-
- switch (db_rep->perm_policy) {
- case DB_REPMGR_ACKS_ONE:
- is_perm = (nsites >= 1);
- break;
- case DB_REPMGR_ACKS_ONE_PEER:
- is_perm = (npeers >= 1);
- break;
- case DB_REPMGR_ACKS_QUORUM:
- /*
- * The minimum number of acks necessary to ensure that the
- * transaction is durable if an election is held (given that we
- * always conduct elections according to the standard,
- * recommended practice of requiring votes from a majority of
- * sites).
- */
- if (__repmgr_get_nsites(db_rep) == 2) {
- /*
- * A group of 2 sites is, as always, a special case.
- * For a transaction to be durable the other site has to
- * have received it.
- */
- is_perm = (npeers >= 1);
- } else
- is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2);
- break;
- case DB_REPMGR_ACKS_ALL:
- /* Adjust by 1, since get_nsites includes local site. */
- is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1);
- break;
- case DB_REPMGR_ACKS_ALL_PEERS:
- if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) {
- /* Assume missing site might be a peer. */
- has_missing_peer = TRUE;
- }
- is_perm = !has_missing_peer;
- break;
- default:
- is_perm = FALSE;
- (void)__db_unknown_path(dbenv, "__repmgr_is_permanent");
- }
- return (is_perm);
-}
-
-/*
- * Abandons a connection, to recover from an error. Upon entry the conn struct
- * must be on the connections list.
- *
- * If the 'do_close' flag is true, we do the whole job; the clean-up includes
- * removing the struct from the list and freeing all its memory, so upon return
- * the caller must not refer to it any further. Otherwise, we merely mark the
- * connection for clean-up later by the main thread.
- *
- * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
- * PUBLIC: REPMGR_CONNECTION *, int));
- *
- * !!!
- * Caller holds mutex.
- */
-int
-__repmgr_bust_connection(dbenv, conn, do_close)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
- int do_close;
-{
- DB_REP *db_rep;
- int connecting, ret, eid;
-
- db_rep = dbenv->rep_handle;
- ret = 0;
-
- DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
- eid = conn->eid;
- connecting = F_ISSET(conn, CONN_CONNECTING);
- if (do_close)
- __repmgr_cleanup_connection(dbenv, conn);
- else {
- F_SET(conn, CONN_DEFUNCT);
- conn->eid = -1;
- }
-
- /*
- * When we first accepted the incoming connection, we set conn->eid to
- * -1 to indicate that we didn't yet know what site it might be from.
- * If we then get here because we later decide it was a redundant
- * connection, the following scary stuff will correctly not happen.
- */
- if (IS_VALID_EID(eid)) {
- /* schedule_connection_attempt wakes the main thread. */
- if ((ret = __repmgr_schedule_connection_attempt(
- dbenv, (u_int)eid, FALSE)) != 0)
- return (ret);
-
- /*
- * If this connection had gotten no further than the CONNECTING
- * state, this can't count as a loss of connection to the
- * master.
- */
- if (!connecting && eid == db_rep->master_eid) {
- (void)__memp_set_config(
- dbenv, DB_MEMP_SYNC_INTERRUPT, 1);
- if ((ret = __repmgr_init_election(
- dbenv, ELECT_FAILURE_ELECTION)) != 0)
- return (ret);
- }
- } else if (!do_close) {
- /*
- * One way or another, make sure the main thread is poked, so
- * that we do the deferred clean-up.
- */
- ret = __repmgr_wake_main_thread(dbenv);
- }
- return (ret);
-}
-
-/*
- * PUBLIC: void __repmgr_cleanup_connection
- * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
- */
-void
-__repmgr_cleanup_connection(dbenv, conn)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
-{
- DB_REP *db_rep;
- QUEUED_OUTPUT *out;
- REPMGR_FLAT *msg;
- DBT *dbt;
-
- db_rep = dbenv->rep_handle;
-
- TAILQ_REMOVE(&db_rep->connections, conn, entries);
- if (conn->fd != INVALID_SOCKET) {
- (void)closesocket(conn->fd);
-#ifdef DB_WIN32
- (void)WSACloseEvent(conn->event_object);
-#endif
- }
-
- /*
- * Deallocate any input and output buffers we may have.
- */
- if (conn->reading_phase == DATA_PHASE) {
- if (conn->msg_type == REPMGR_REP_MESSAGE)
- __os_free(dbenv, conn->input.rep_message);
- else {
- dbt = &conn->input.repmgr_msg.cntrl;
- __os_free(dbenv, dbt->data);
- dbt = &conn->input.repmgr_msg.rec;
- if (dbt->size > 0)
- __os_free(dbenv, dbt->data);
- }
- }
- while (!STAILQ_EMPTY(&conn->outbound_queue)) {
- out = STAILQ_FIRST(&conn->outbound_queue);
- STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
- msg = out->msg;
- if (--msg->ref_count <= 0)
- __os_free(dbenv, msg);
- __os_free(dbenv, out);
- }
-
- __os_free(dbenv, conn);
-}
-
-static int
-enqueue_msg(dbenv, conn, msg, offset)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
- struct sending_msg *msg;
- size_t offset;
-{
- QUEUED_OUTPUT *q_element;
- int ret;
-
- if (msg->fmsg == NULL && ((ret = flatten(dbenv, msg)) != 0))
- return (ret);
- if ((ret = __os_malloc(dbenv, sizeof(QUEUED_OUTPUT), &q_element)) != 0)
- return (ret);
- q_element->msg = msg->fmsg;
- msg->fmsg->ref_count++; /* encapsulation would be sweeter */
- q_element->offset = offset;
-
- /* Put it on the connection's outbound queue. */
- STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries);
- conn->out_queue_length++;
- return (0);
-}
-
-/*
- * The 'rec' DBT can be NULL, in which case we treat it like a zero-length DBT.
- * But 'control' is always present.
- */
-static void
-setup_sending_msg(msg, type, control, rec)
- struct sending_msg *msg;
- u_int type;
- const DBT *control, *rec;
-{
- u_int32_t rec_size;
-
- /*
- * The wire protocol is documented in a comment at the top of this
- * module.
- */
- __repmgr_iovec_init(&msg->iovecs);
- msg->type = type;
- __repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type));
-
- msg->control_size_buf = htonl(control->size);
- __repmgr_add_buffer(&msg->iovecs,
- &msg->control_size_buf, sizeof(msg->control_size_buf));
-
- rec_size = rec == NULL ? 0 : rec->size;
- msg->rec_size_buf = htonl(rec_size);
- __repmgr_add_buffer(
- &msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf));
-
- if (control->size > 0)
- __repmgr_add_dbt(&msg->iovecs, control);
-
- if (rec_size > 0)
- __repmgr_add_dbt(&msg->iovecs, rec);
-
- msg->fmsg = NULL;
-}
-
-/*
- * Convert a message stored as iovec pointers to various pieces, into flattened
- * form, by copying all the pieces, and then make the iovec just point to the
- * new simplified form.
- */
-static int
-flatten(dbenv, msg)
- DB_ENV *dbenv;
- struct sending_msg *msg;
-{
- u_int8_t *p;
- size_t msg_size;
- int i, ret;
-
- DB_ASSERT(dbenv, msg->fmsg == NULL);
-
- msg_size = msg->iovecs.total_bytes;
- if ((ret = __os_malloc(dbenv, sizeof(*msg->fmsg) + msg_size,
- &msg->fmsg)) != 0)
- return (ret);
- msg->fmsg->length = msg_size;
- msg->fmsg->ref_count = 0;
- p = &msg->fmsg->data[0];
-
- for (i = 0; i < msg->iovecs.count; i++) {
- memcpy(p, msg->iovecs.vectors[i].iov_base,
- msg->iovecs.vectors[i].iov_len);
- p = &p[msg->iovecs.vectors[i].iov_len];
- }
- __repmgr_iovec_init(&msg->iovecs);
- __repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size);
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
- */
-int
-__repmgr_find_site(dbenv, host, port)
- DB_ENV *dbenv;
- const char *host;
- u_int port;
-{
- DB_REP *db_rep;
- REPMGR_SITE *site;
- u_int i;
-
- db_rep = dbenv->rep_handle;
- for (i = 0; i < db_rep->site_cnt; i++) {
- site = &db_rep->sites[i];
-
- if (strcmp(site->net_addr.host, host) == 0 &&
- site->net_addr.port == port)
- return ((int)i);
- }
-
- return (-1);
-}
-
-/*
- * Stash a copy of the given host name and port number into a convenient data
- * structure so that we can save it permanently. This is kind of like a
- * constructor for a netaddr object, except that the caller supplies the memory
- * for the base struct (though not the subordinate attachments).
- *
- * All inputs are assumed to have been already validated.
- *
- * PUBLIC: int __repmgr_pack_netaddr __P((DB_ENV *, const char *,
- * PUBLIC: u_int, ADDRINFO *, repmgr_netaddr_t *));
- */
-int
-__repmgr_pack_netaddr(dbenv, host, port, list, addr)
- DB_ENV *dbenv;
- const char *host;
- u_int port;
- ADDRINFO *list;
- repmgr_netaddr_t *addr;
-{
- int ret;
-
- DB_ASSERT(dbenv, host != NULL);
-
- if ((ret = __os_strdup(dbenv, host, &addr->host)) != 0)
- return (ret);
- addr->port = (u_int16_t)port;
- addr->address_list = list;
- addr->current = NULL;
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_getaddr __P((DB_ENV *,
- * PUBLIC: const char *, u_int, int, ADDRINFO **));
- */
-int
-__repmgr_getaddr(dbenv, host, port, flags, result)
- DB_ENV *dbenv;
- const char *host;
- u_int port;
- int flags; /* Matches struct addrinfo declaration. */
- ADDRINFO **result;
-{
- ADDRINFO *answer, hints;
- char buffer[10]; /* 2**16 fits in 5 digits. */
-#ifdef DB_WIN32
- int ret;
-#endif
-
- /*
- * Ports are really 16-bit unsigned values, but it's too painful to
- * push that type through the API.
- */
- if (port > UINT16_MAX) {
- __db_errx(dbenv, "port %u larger than max port %u",
- port, UINT16_MAX);
- return (EINVAL);
- }
-
-#ifdef DB_WIN32
- if (!dbenv->rep_handle->wsa_inited &&
- (ret = __repmgr_wsa_init(dbenv)) != 0)
- return (ret);
-#endif
-
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = flags;
- (void)snprintf(buffer, sizeof(buffer), "%u", port);
-
- /*
- * Although it's generally bad to discard error information, the return
- * code from __db_getaddrinfo is undependable. Our callers at least
- * would like to be able to distinguish errors in getaddrinfo (which we
- * want to consider to be re-tryable), from other failure (e.g., EINVAL,
- * above).
- */
- if (__db_getaddrinfo(dbenv, host, port, buffer, &hints, &answer) != 0)
- return (DB_REP_UNAVAIL);
- *result = answer;
-
- return (0);
-}
-
-/*
- * Adds a new site to our array of known sites (unless it already exists),
- * and schedules it for immediate connection attempt. Whether it exists or not,
- * we set newsitep, either to the already existing site, or to the newly created
- * site. Unless newsitep is passed in as NULL, which is allowed.
- *
- * PUBLIC: int __repmgr_add_site
- * PUBLIC: __P((DB_ENV *, const char *, u_int, REPMGR_SITE **));
- *
- * !!!
- * Caller is expected to hold the mutex.
- */
-int
-__repmgr_add_site(dbenv, host, port, newsitep)
- DB_ENV *dbenv;
- const char *host;
- u_int port;
- REPMGR_SITE **newsitep;
-{
- DB_REP *db_rep;
- ADDRINFO *address_list;
- repmgr_netaddr_t addr;
- REPMGR_SITE *site;
- int ret, eid;
-
- ret = 0;
- db_rep = dbenv->rep_handle;
-
- if (IS_VALID_EID(eid = __repmgr_find_site(dbenv, host, port))) {
- site = SITE_FROM_EID(eid);
- ret = EEXIST;
- goto out;
- }
-
- if ((ret = __repmgr_getaddr(
- dbenv, host, port, 0, &address_list)) == DB_REP_UNAVAIL) {
- /* Allow re-tryable errors. We'll try again later. */
- address_list = NULL;
- } else if (ret != 0)
- return (ret);
-
- if ((ret = __repmgr_pack_netaddr(
- dbenv, host, port, address_list, &addr)) != 0) {
- __db_freeaddrinfo(dbenv, address_list);
- return (ret);
- }
-
- if ((ret = __repmgr_new_site(dbenv, &site, &addr, SITE_IDLE)) != 0) {
- __repmgr_cleanup_netaddr(dbenv, &addr);
- return (ret);
- }
-
- if (db_rep->selector != NULL &&
- (ret = __repmgr_schedule_connection_attempt(
- dbenv, (u_int)EID_FROM_SITE(site), TRUE)) != 0)
- return (ret);
-
- /* Note that we should only come here for success and EEXIST. */
-out:
- if (newsitep != NULL)
- *newsitep = site;
- return (ret);
-}
-
-/*
- * Initializes net-related memory in the db_rep handle.
- *
- * PUBLIC: int __repmgr_net_create __P((DB_ENV *, DB_REP *));
- */
-int
-__repmgr_net_create(dbenv, db_rep)
- DB_ENV *dbenv;
- DB_REP *db_rep;
-{
- COMPQUIET(dbenv, NULL);
-
- db_rep->listen_fd = INVALID_SOCKET;
- db_rep->master_eid = DB_EID_INVALID;
-
- TAILQ_INIT(&db_rep->connections);
- TAILQ_INIT(&db_rep->retries);
-
- return (0);
-}
-
-/*
- * listen_socket_init --
- * Initialize a socket for listening. Sets
- * a file descriptor for the socket, ready for an accept() call
- * in a thread that we're happy to let block.
- *
- * PUBLIC: int __repmgr_listen __P((DB_ENV *));
- */
-int
-__repmgr_listen(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- ADDRINFO *ai;
- char *why;
- int sockopt, ret;
- socket_t s;
-
- db_rep = dbenv->rep_handle;
-
- /* Use OOB value as sentinel to show no socket open. */
- s = INVALID_SOCKET;
- ai = ADDR_LIST_FIRST(&db_rep->my_addr);
-
- /*
- * Given the assert is correct, we execute the loop at least once, which
- * means 'why' will have been set by the time it's needed. But I guess
- * lint doesn't know about DB_ASSERT.
- */
- COMPQUIET(why, "");
- DB_ASSERT(dbenv, ai != NULL);
- for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) {
-
- if ((s = socket(ai->ai_family,
- ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
- why = "can't create listen socket";
- continue;
- }
-
- /*
- * When testing, it's common to kill and restart regularly. On
- * some systems, this causes bind to fail with "address in use"
- * errors unless this option is set.
- */
- sockopt = 1;
- if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt,
- sizeof(sockopt)) != 0) {
- why = "can't set REUSEADDR socket option";
- break;
- }
-
- if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
- why = "can't bind socket to listening address";
- (void)closesocket(s);
- s = INVALID_SOCKET;
- continue;
- }
-
- if (listen(s, 5) != 0) {
- why = "listen()";
- break;
- }
-
- if ((ret = __repmgr_set_nonblocking(s)) != 0) {
- __db_err(dbenv, ret, "can't unblock listen socket");
- goto clean;
- }
-
- db_rep->listen_fd = s;
- return (0);
- }
-
- ret = net_errno;
- __db_err(dbenv, ret, why);
-clean: if (s != INVALID_SOCKET)
- (void)closesocket(s);
- return (ret);
-}
-
-/*
- * PUBLIC: int __repmgr_net_close __P((DB_ENV *));
- */
-int
-__repmgr_net_close(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- REPMGR_CONNECTION *conn;
-#ifndef DB_WIN32
- struct sigaction sigact;
-#endif
- int ret;
-
- db_rep = dbenv->rep_handle;
- if (db_rep->listen_fd == INVALID_SOCKET)
- return (0);
-
- TAILQ_FOREACH(conn, &db_rep->connections, entries) {
- if (conn->fd != INVALID_SOCKET) {
- (void)closesocket(conn->fd);
- conn->fd = INVALID_SOCKET;
-#ifdef DB_WIN32
- (void)WSACloseEvent(conn->event_object);
-#endif
- }
- }
-
- ret = 0;
- if (closesocket(db_rep->listen_fd) == SOCKET_ERROR)
- ret = net_errno;
-
-#ifdef DB_WIN32
- /* Shut down the Windows sockets DLL. */
- if (WSACleanup() == SOCKET_ERROR && ret == 0)
- ret = net_errno;
- db_rep->wsa_inited = FALSE;
-#else
- /* Restore original SIGPIPE handling configuration. */
- if (db_rep->chg_sig_handler) {
- memset(&sigact, 0, sizeof(sigact));
- sigact.sa_handler = SIG_DFL;
- if (sigaction(SIGPIPE, &sigact, NULL) == -1 && ret == 0)
- ret = errno;
- }
-#endif
- db_rep->listen_fd = INVALID_SOCKET;
- return (ret);
-}
-
-/*
- * PUBLIC: void __repmgr_net_destroy __P((DB_ENV *, DB_REP *));
- */
-void
-__repmgr_net_destroy(dbenv, db_rep)
- DB_ENV *dbenv;
- DB_REP *db_rep;
-{
- REPMGR_CONNECTION *conn;
- REPMGR_RETRY *retry;
- REPMGR_SITE *site;
- u_int i;
-
- __repmgr_cleanup_netaddr(dbenv, &db_rep->my_addr);
-
- if (db_rep->sites == NULL)
- return;
-
- while (!TAILQ_EMPTY(&db_rep->retries)) {
- retry = TAILQ_FIRST(&db_rep->retries);
- TAILQ_REMOVE(&db_rep->retries, retry, entries);
- __os_free(dbenv, retry);
- }
-
- while (!TAILQ_EMPTY(&db_rep->connections)) {
- conn = TAILQ_FIRST(&db_rep->connections);
- __repmgr_cleanup_connection(dbenv, conn);
- }
-
- for (i = 0; i < db_rep->site_cnt; i++) {
- site = &db_rep->sites[i];
- __repmgr_cleanup_netaddr(dbenv, &site->net_addr);
- }
- __os_free(dbenv, db_rep->sites);
- db_rep->sites = NULL;
-}
diff --git a/db/repmgr/repmgr_posix.c b/db/repmgr/repmgr_posix.c
deleted file mode 100644
index fde88262d..000000000
--- a/db/repmgr/repmgr_posix.c
+++ /dev/null
@@ -1,691 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2005,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_posix.c,v 1.29 2007/06/11 18:29:34 alanb Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#define __INCLUDE_SELECT_H 1
-#include "db_int.h"
-
-/*
- * A very rough guess at the maximum stack space one of our threads could ever
- * need, which we hope is plenty conservative. This can be patched in the field
- * if necessary.
- */
-#ifdef _POSIX_THREAD_ATTR_STACKSIZE
-size_t __repmgr_guesstimated_max = (128 * 1024);
-#endif
-
-static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
-
-/*
- * Starts the thread described in the argument, and stores the resulting thread
- * ID therein.
- *
- * PUBLIC: int __repmgr_thread_start __P((DB_ENV *, REPMGR_RUNNABLE *));
- */
-int
-__repmgr_thread_start(dbenv, runnable)
- DB_ENV *dbenv;
- REPMGR_RUNNABLE *runnable;
-{
- pthread_attr_t *attrp;
-#ifdef _POSIX_THREAD_ATTR_STACKSIZE
- pthread_attr_t attributes;
- size_t size;
- int ret;
-#endif
-
- runnable->finished = FALSE;
-
-#ifdef _POSIX_THREAD_ATTR_STACKSIZE
- attrp = &attributes;
- if ((ret = pthread_attr_init(&attributes)) != 0) {
- __db_err(dbenv,
- ret, "pthread_attr_init in repmgr_thread_start");
- return (ret);
- }
-
- /*
- * On a 64-bit machine it seems reasonable that we could need twice as
- * much stack space as we did on a 32-bit machine.
- */
- size = __repmgr_guesstimated_max;
- if (sizeof(size_t) > 4)
- size *= 2;
-#ifdef PTHREAD_STACK_MIN
- if (size < PTHREAD_STACK_MIN)
- size = PTHREAD_STACK_MIN;
-#endif
- if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) {
- __db_err(dbenv,
- ret, "pthread_attr_setstacksize in repmgr_thread_start");
- return (ret);
- }
-#else
- attrp = NULL;
-#endif
-
- return (pthread_create(&runnable->thread_id, attrp,
- runnable->run, dbenv));
-}
-
-/*
- * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *));
- */
-int
-__repmgr_thread_join(thread)
- REPMGR_RUNNABLE *thread;
-{
- return (pthread_join(thread->thread_id, NULL));
-}
-
-/*
- * PUBLIC: int __repmgr_set_nonblocking __P((socket_t));
- */
-int
-__repmgr_set_nonblocking(fd)
- socket_t fd;
-{
- int flags;
-
- if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
- return (errno);
- if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
- return (errno);
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_wake_waiting_senders __P((DB_ENV *));
- *
- * Wake any send()-ing threads waiting for an acknowledgement.
- *
- * !!!
- * Caller must hold the db_rep->mutex, if this thread synchronization is to work
- * properly.
- */
-int
-__repmgr_wake_waiting_senders(dbenv)
- DB_ENV *dbenv;
-{
- return (pthread_cond_broadcast(&dbenv->rep_handle->ack_condition));
-}
-
-/*
- * PUBLIC: int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
- *
- * Waits (a limited time) for configured number of remote sites to ack the given
- * LSN.
- *
- * !!!
- * Caller must hold repmgr->mutex.
- */
-int
-__repmgr_await_ack(dbenv, lsnp)
- DB_ENV *dbenv;
- const DB_LSN *lsnp;
-{
- DB_REP *db_rep;
- struct timespec deadline;
- int ret, timed;
-
- db_rep = dbenv->rep_handle;
-
- if ((timed = (db_rep->ack_timeout > 0)))
- __repmgr_compute_wait_deadline(dbenv, &deadline,
- db_rep->ack_timeout);
- else
- COMPQUIET(deadline.tv_sec, 0);
-
- while (!__repmgr_is_permanent(dbenv, lsnp)) {
- if (timed)
- ret = pthread_cond_timedwait(&db_rep->ack_condition,
- &db_rep->mutex, &deadline);
- else
- ret = pthread_cond_wait(&db_rep->ack_condition,
- &db_rep->mutex);
- if (db_rep->finished)
- return (DB_REP_UNAVAIL);
- if (ret != 0)
- return (ret);
- }
- return (0);
-}
-
-/*
- * Computes a deadline time a certain distance into the future, in a form
- * suitable for the pthreads timed wait operation. Curiously, that call uses
- * nano-second resolution; elsewhere we use microseconds.
- *
- * PUBLIC: void __repmgr_compute_wait_deadline __P((DB_ENV*,
- * PUBLIC: struct timespec *, db_timeout_t));
- */
-void
-__repmgr_compute_wait_deadline(dbenv, result, wait)
- DB_ENV *dbenv;
- struct timespec *result;
- db_timeout_t wait;
-{
- db_timespec v;
-
- /*
- * Start with "now"; then add the "wait" offset.
- *
- * A db_timespec is the same as a "struct timespec" so we can pass
- * result directly to the underlying Berkeley DB OS routine.
- */
- __os_gettime(dbenv, (db_timespec *)result);
-
- /* Convert microsecond wait to a timespec. */
- DB_TIMEOUT_TO_TIMESPEC(wait, &v);
-
- timespecadd(result, &v);
-}
-
-/*
- * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
- *
- * Allocate/initialize all data necessary for thread synchronization. This
- * should be an all-or-nothing affair. Other than here and in _close_sync there
- * should never be a time when these resources aren't either all allocated or
- * all freed. If that's true, then we can safely use the values of the file
- * descriptor(s) to keep track of which it is.
- */
-int
-__repmgr_init_sync(dbenv, db_rep)
- DB_ENV *dbenv;
- DB_REP *db_rep;
-{
- int ret, mutex_inited, ack_inited, elect_inited, queue_inited,
- file_desc[2];
-
- COMPQUIET(dbenv, NULL);
-
- mutex_inited = ack_inited = elect_inited = queue_inited = FALSE;
-
- if ((ret = pthread_mutex_init(&db_rep->mutex, NULL)) != 0)
- goto err;
- mutex_inited = TRUE;
-
- if ((ret = pthread_cond_init(&db_rep->ack_condition, NULL)) != 0)
- goto err;
- ack_inited = TRUE;
-
- if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0)
- goto err;
- elect_inited = TRUE;
-
- if ((ret = pthread_cond_init(&db_rep->queue_nonempty, NULL)) != 0)
- goto err;
- queue_inited = TRUE;
-
- if ((ret = pipe(file_desc)) == -1) {
- ret = errno;
- goto err;
- }
-
- db_rep->read_pipe = file_desc[0];
- db_rep->write_pipe = file_desc[1];
- return (0);
-err:
- if (queue_inited)
- (void)pthread_cond_destroy(&db_rep->queue_nonempty);
- if (elect_inited)
- (void)pthread_cond_destroy(&db_rep->check_election);
- if (ack_inited)
- (void)pthread_cond_destroy(&db_rep->ack_condition);
- if (mutex_inited)
- (void)pthread_mutex_destroy(&db_rep->mutex);
- db_rep->read_pipe = db_rep->write_pipe = -1;
-
- return (ret);
-}
-
-/*
- * PUBLIC: int __repmgr_close_sync __P((DB_ENV *));
- *
- * Frees the thread synchronization data within a repmgr struct, in a
- * platform-specific way.
- */
-int
-__repmgr_close_sync(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- int ret, t_ret;
-
- db_rep = dbenv->rep_handle;
-
- if (!(REPMGR_SYNC_INITED(db_rep)))
- return (0);
-
- ret = pthread_cond_destroy(&db_rep->queue_nonempty);
-
- if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 &&
- ret == 0)
- ret = t_ret;
-
- if ((t_ret = pthread_cond_destroy(&db_rep->ack_condition)) != 0 &&
- ret == 0)
- ret = t_ret;
-
- if ((t_ret = pthread_mutex_destroy(&db_rep->mutex)) != 0 &&
- ret == 0)
- ret = t_ret;
-
- if (close(db_rep->read_pipe) == -1 && ret == 0)
- ret = errno;
- if (close(db_rep->write_pipe) == -1 && ret == 0)
- ret = errno;
-
- db_rep->read_pipe = db_rep->write_pipe = -1;
- return (ret);
-}
-
-/*
- * Performs net-related resource initialization other than memory initialization
- * and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing"
- * sentinel signifying that these resources are allocated.
- *
- * PUBLIC: int __repmgr_net_init __P((DB_ENV *, DB_REP *));
- */
-int
-__repmgr_net_init(dbenv, db_rep)
- DB_ENV *dbenv;
- DB_REP *db_rep;
-{
- int ret;
- struct sigaction sigact;
-
- if ((ret = __repmgr_listen(dbenv)) != 0)
- return (ret);
-
- /*
- * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed
- * just for trying to write onto a socket that had been reset.
- */
- if (sigaction(SIGPIPE, NULL, &sigact) == -1) {
- ret = errno;
- __db_err(dbenv, ret, "can't access signal handler");
- goto err;
- }
- /*
- * If we need to change the sig handler, do so, and also set a flag so
- * that we remember we did.
- */
- if ((db_rep->chg_sig_handler = (sigact.sa_handler == SIG_DFL))) {
- sigact.sa_handler = SIG_IGN;
- sigact.sa_flags = 0;
- if (sigaction(SIGPIPE, &sigact, NULL) == -1) {
- ret = errno;
- __db_err(dbenv, ret, "can't access signal handler");
- goto err;
- }
- }
- return (0);
-
-err:
- (void)closesocket(db_rep->listen_fd);
- db_rep->listen_fd = INVALID_SOCKET;
- return (ret);
-}
-
-/*
- * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *));
- */
-int
-__repmgr_lock_mutex(mutex)
- mgr_mutex_t *mutex;
-{
- return (pthread_mutex_lock(mutex));
-}
-
-/*
- * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *));
- */
-int
-__repmgr_unlock_mutex(mutex)
- mgr_mutex_t *mutex;
-{
- return (pthread_mutex_unlock(mutex));
-}
-
-/*
- * Signals a condition variable.
- *
- * !!!
- * Caller must hold mutex.
- *
- * PUBLIC: int __repmgr_signal __P((cond_var_t *));
- */
-int
-__repmgr_signal(v)
- cond_var_t *v;
-{
- return (pthread_cond_broadcast(v));
-}
-
-/*
- * PUBLIC: int __repmgr_wake_main_thread __P((DB_ENV*));
- */
-int
-__repmgr_wake_main_thread(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- u_int8_t any_value;
-
- COMPQUIET(any_value, 0);
- db_rep = dbenv->rep_handle;
-
- /*
- * It doesn't matter what byte value we write. Just the appearance of a
- * byte in the stream is enough to wake up the select() thread reading
- * the pipe.
- */
- if (write(db_rep->write_pipe, &any_value, 1) == -1)
- return (errno);
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *));
- */
-int
-__repmgr_writev(fd, iovec, buf_count, byte_count_p)
- socket_t fd;
- db_iovec_t *iovec;
- int buf_count;
- size_t *byte_count_p;
-{
- int nw;
-
- if ((nw = writev(fd, iovec, buf_count)) == -1)
- return (errno);
- *byte_count_p = (size_t)nw;
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *));
- */
-int
-__repmgr_readv(fd, iovec, buf_count, byte_count_p)
- socket_t fd;
- db_iovec_t *iovec;
- int buf_count;
- size_t *byte_count_p;
-{
- ssize_t nw;
-
- if ((nw = readv(fd, iovec, buf_count)) == -1)
- return (errno);
- *byte_count_p = (size_t)nw;
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_select_loop __P((DB_ENV *));
- */
-int
-__repmgr_select_loop(dbenv)
- DB_ENV *dbenv;
-{
- struct timeval select_timeout, *select_timeout_p;
- DB_REP *db_rep;
- REPMGR_CONNECTION *conn, *next;
- REPMGR_RETRY *retry;
- db_timespec timeout;
- fd_set reads, writes;
- int ret, flow_control, maxfd, nready;
- u_int8_t buf[10]; /* arbitrary size */
-
- flow_control = FALSE;
-
- db_rep = dbenv->rep_handle;
- /*
- * Almost this entire thread operates while holding the mutex. But note
- * that it never blocks, except in the call to select() (which is the
- * one place we relinquish the mutex).
- */
- LOCK_MUTEX(db_rep->mutex);
- if ((ret = __repmgr_first_try_connections(dbenv)) != 0)
- goto out;
- for (;;) {
- FD_ZERO(&reads);
- FD_ZERO(&writes);
-
- /*
- * Always ask for input on listening socket and signalling
- * pipe.
- */
- FD_SET((u_int)db_rep->listen_fd, &reads);
- maxfd = db_rep->listen_fd;
-
- FD_SET((u_int)db_rep->read_pipe, &reads);
- if (db_rep->read_pipe > maxfd)
- maxfd = db_rep->read_pipe;
-
- /*
- * Examine all connections to see what sort of I/O to ask for on
- * each one.
- */
- TAILQ_FOREACH(conn, &db_rep->connections, entries) {
- if (F_ISSET(conn, CONN_CONNECTING)) {
- FD_SET((u_int)conn->fd, &reads);
- FD_SET((u_int)conn->fd, &writes);
- if (conn->fd > maxfd)
- maxfd = conn->fd;
- continue;
- }
-
- if (!STAILQ_EMPTY(&conn->outbound_queue)) {
- FD_SET((u_int)conn->fd, &writes);
- if (conn->fd > maxfd)
- maxfd = conn->fd;
- }
- /*
- * If we haven't yet gotten site's handshake, then read
- * from it even if we're flow-controlling.
- */
- if (!flow_control || !IS_VALID_EID(conn->eid)) {
- FD_SET((u_int)conn->fd, &reads);
- if (conn->fd > maxfd)
- maxfd = conn->fd;
- }
- }
- /*
- * Decide how long to wait based on when it will next be time to
- * retry an idle connection. (List items are in order, so we
- * only have to examine the first one.)
- */
- if (TAILQ_EMPTY(&db_rep->retries))
- select_timeout_p = NULL;
- else {
- retry = TAILQ_FIRST(&db_rep->retries);
-
- __repmgr_timespec_diff_now(
- dbenv, &retry->time, &timeout);
-
- /* Convert the timespec to a timeval. */
- select_timeout.tv_sec = timeout.tv_sec;
- select_timeout.tv_usec = timeout.tv_nsec / NS_PER_US;
- select_timeout_p = &select_timeout;
- }
-
- UNLOCK_MUTEX(db_rep->mutex);
-
- if ((ret = select(maxfd + 1,
- &reads, &writes, NULL, select_timeout_p)) == -1) {
- switch (ret = errno) {
- case EINTR:
- case EWOULDBLOCK:
- LOCK_MUTEX(db_rep->mutex);
- continue; /* simply retry */
- default:
- __db_err(dbenv, ret, "select");
- return (ret);
- }
- }
- nready = ret;
-
- LOCK_MUTEX(db_rep->mutex);
-
- /*
- * The first priority thing we must do is to clean up any
- * pending defunct connections. Otherwise, if they have any
- * lingering pending input, we get very confused if we try to
- * process it.
- *
- * The TAILQ_FOREACH macro would be suitable here, except that
- * it doesn't allow unlinking the current element, which is
- * needed for cleanup_connection.
- */
- for (conn = TAILQ_FIRST(&db_rep->connections);
- conn != NULL;
- conn = next) {
- next = TAILQ_NEXT(conn, entries);
- if (F_ISSET(conn, CONN_DEFUNCT))
- __repmgr_cleanup_connection(dbenv, conn);
- }
-
- if ((ret = __repmgr_retry_connections(dbenv)) != 0)
- goto out;
- if (nready == 0)
- continue;
-
- /*
- * Traverse the linked list. (Again, like TAILQ_FOREACH, except
- * that we need the ability to unlink an element along the way.)
- */
- for (conn = TAILQ_FIRST(&db_rep->connections);
- conn != NULL;
- conn = next) {
- next = TAILQ_NEXT(conn, entries);
- if (F_ISSET(conn, CONN_CONNECTING)) {
- if (FD_ISSET((u_int)conn->fd, &reads) ||
- FD_ISSET((u_int)conn->fd, &writes)) {
- if ((ret = finish_connecting(dbenv,
- conn)) == DB_REP_UNAVAIL) {
- if ((ret =
- __repmgr_bust_connection(
- dbenv, conn, TRUE)) != 0)
- goto out;
- } else if (ret != 0)
- goto out;
- }
- continue;
- }
-
- /*
- * Here, the site is connected, and the FD_SET's are
- * valid.
- */
- if (FD_ISSET((u_int)conn->fd, &writes)) {
- if ((ret = __repmgr_write_some(
- dbenv, conn)) == DB_REP_UNAVAIL) {
- if ((ret =
- __repmgr_bust_connection(dbenv,
- conn, TRUE)) != 0)
- goto out;
- continue;
- } else if (ret != 0)
- goto out;
- }
-
- if (!flow_control &&
- FD_ISSET((u_int)conn->fd, &reads)) {
- if ((ret = __repmgr_read_from_site(dbenv, conn))
- == DB_REP_UNAVAIL) {
- if ((ret =
- __repmgr_bust_connection(dbenv,
- conn, TRUE)) != 0)
- goto out;
- continue;
- } else if (ret != 0)
- goto out;
- }
- }
-
- /*
- * Read any bytes in the signalling pipe. Note that we don't
- * actually need to do anything with them; they're just there to
- * wake us up when necessary.
- */
- if (FD_ISSET((u_int)db_rep->read_pipe, &reads)) {
- if (read(db_rep->read_pipe, buf, sizeof(buf)) <= 0) {
- ret = errno;
- goto out;
- } else if (db_rep->finished) {
- ret = 0;
- goto out;
- }
- }
- if (FD_ISSET((u_int)db_rep->listen_fd, &reads) &&
- (ret = __repmgr_accept(dbenv)) != 0)
- goto out;
- }
-out:
- UNLOCK_MUTEX(db_rep->mutex);
- return (ret);
-}
-
-static int
-finish_connecting(dbenv, conn)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
-{
- DB_REP *db_rep;
- REPMGR_SITE *site;
- socklen_t len;
- SITE_STRING_BUFFER buffer;
- u_int eid;
- int error, ret;
-
- len = sizeof(error);
- if (getsockopt(
- conn->fd, SOL_SOCKET, SO_ERROR, (sockopt_t)&error, &len) < 0)
- goto err_rpt;
- if (error) {
- errno = error;
- goto err_rpt;
- }
-
- F_CLR(conn, CONN_CONNECTING);
- return (__repmgr_send_handshake(dbenv, conn));
-
-err_rpt:
- db_rep = dbenv->rep_handle;
-
- DB_ASSERT(dbenv, IS_VALID_EID(conn->eid));
- eid = (u_int)conn->eid;
-
- site = SITE_FROM_EID(eid);
- __db_err(dbenv, errno,
- "connecting to %s", __repmgr_format_site_loc(site, buffer));
-
- /* If we've exhausted the list of possible addresses, give up. */
- if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
- return (DB_REP_UNAVAIL);
-
- /*
- * This is just like a little mini-"bust_connection", except that we
- * don't reschedule for later, 'cuz we're just about to try again right
- * now.
- *
- * !!!
- * Which means this must only be called on the select() thread, since
- * only there are we allowed to actually close a connection.
- */
- DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
- __repmgr_cleanup_connection(dbenv, conn);
- ret = __repmgr_connect_site(dbenv, eid);
- DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
- return (ret);
-}
diff --git a/db/repmgr/repmgr_queue.c b/db/repmgr/repmgr_queue.c
deleted file mode 100644
index af9c84e5a..000000000
--- a/db/repmgr/repmgr_queue.c
+++ /dev/null
@@ -1,155 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2006,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_queue.c,v 1.9 2007/05/17 15:15:51 bostic Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#include "db_int.h"
-
-typedef STAILQ_HEAD(__repmgr_q_header, __repmgr_message) QUEUE_HEADER;
-struct __repmgr_queue {
- int size;
- QUEUE_HEADER header;
-};
-
-/*
- * PUBLIC: int __repmgr_queue_create __P((DB_ENV *, DB_REP *));
- */
-int
-__repmgr_queue_create(dbenv, db_rep)
- DB_ENV *dbenv;
- DB_REP *db_rep;
-{
- REPMGR_QUEUE *q;
- int ret;
-
- if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_QUEUE), &q)) != 0)
- return (ret);
- q->size = 0;
- STAILQ_INIT(&q->header);
- db_rep->input_queue = q;
- return (0);
-}
-
-/*
- * Frees not only the queue header, but also any messages that may be on it,
- * along with their data buffers.
- *
- * PUBLIC: void __repmgr_queue_destroy __P((DB_ENV *));
- */
-void
-__repmgr_queue_destroy(dbenv)
- DB_ENV *dbenv;
-{
- REPMGR_QUEUE *q;
- REPMGR_MESSAGE *m;
-
- if ((q = dbenv->rep_handle->input_queue) == NULL)
- return;
-
- while (!STAILQ_EMPTY(&q->header)) {
- m = STAILQ_FIRST(&q->header);
- STAILQ_REMOVE_HEAD(&q->header, entries);
- __os_free(dbenv, m);
- }
- __os_free(dbenv, q);
-}
-
-/*
- * PUBLIC: int __repmgr_queue_get __P((DB_ENV *, REPMGR_MESSAGE **));
- *
- * Get the first input message from the queue and return it to the caller. The
- * caller hereby takes responsibility for the entire message buffer, and should
- * free it when done.
- *
- * Note that caller is NOT expected to hold the mutex. This is asymmetric with
- * put(), because put() is expected to be called in a loop after select, where
- * it's already necessary to be holding the mutex.
- */
-int
-__repmgr_queue_get(dbenv, msgp)
- DB_ENV *dbenv;
- REPMGR_MESSAGE **msgp;
-{
- DB_REP *db_rep;
- REPMGR_QUEUE *q;
- REPMGR_MESSAGE *m;
- int ret;
-
- ret = 0;
- db_rep = dbenv->rep_handle;
- q = db_rep->input_queue;
-
- LOCK_MUTEX(db_rep->mutex);
- while (STAILQ_EMPTY(&q->header) && !db_rep->finished) {
-#ifdef DB_WIN32
- if (!ResetEvent(db_rep->queue_nonempty)) {
- ret = GetLastError();
- goto err;
- }
- if (SignalObjectAndWait(db_rep->mutex, db_rep->queue_nonempty,
- INFINITE, FALSE) != WAIT_OBJECT_0) {
- ret = GetLastError();
- goto err;
- }
- LOCK_MUTEX(db_rep->mutex);
-#else
- if ((ret = pthread_cond_wait(&db_rep->queue_nonempty,
- &db_rep->mutex)) != 0)
- goto err;
-#endif
- }
- if (db_rep->finished)
- ret = DB_REP_UNAVAIL;
- else {
- m = STAILQ_FIRST(&q->header);
- STAILQ_REMOVE_HEAD(&q->header, entries);
- q->size--;
- *msgp = m;
- }
-
-err:
- UNLOCK_MUTEX(db_rep->mutex);
- return (ret);
-}
-
-/*
- * PUBLIC: int __repmgr_queue_put __P((DB_ENV *, REPMGR_MESSAGE *));
- *
- * !!!
- * Caller must hold repmgr->mutex.
- */
-int
-__repmgr_queue_put(dbenv, msg)
- DB_ENV *dbenv;
- REPMGR_MESSAGE *msg;
-{
- DB_REP *db_rep;
- REPMGR_QUEUE *q;
-
- db_rep = dbenv->rep_handle;
- q = db_rep->input_queue;
-
- STAILQ_INSERT_TAIL(&q->header, msg, entries);
- q->size++;
-
- return (__repmgr_signal(&db_rep->queue_nonempty));
-}
-
-/*
- * PUBLIC: int __repmgr_queue_size __P((DB_ENV *));
- *
- * !!!
- * Caller must hold repmgr->mutex.
- */
-int
-__repmgr_queue_size(dbenv)
- DB_ENV *dbenv;
-{
- return (dbenv->rep_handle->input_queue->size);
-}
diff --git a/db/repmgr/repmgr_sel.c b/db/repmgr/repmgr_sel.c
deleted file mode 100644
index f1aa2ad88..000000000
--- a/db/repmgr/repmgr_sel.c
+++ /dev/null
@@ -1,875 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2006,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_sel.c,v 1.36 2007/06/11 18:29:34 alanb Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#include "db_int.h"
-
-static int __repmgr_connect __P((DB_ENV*, socket_t *, REPMGR_SITE *));
-static int dispatch_phase_completion __P((DB_ENV *, REPMGR_CONNECTION *));
-static int notify_handshake __P((DB_ENV *, REPMGR_CONNECTION *));
-static int record_ack __P((DB_ENV *, REPMGR_SITE *, DB_REPMGR_ACK *));
-static int __repmgr_try_one __P((DB_ENV *, u_int));
-
-/*
- * PUBLIC: void *__repmgr_select_thread __P((void *));
- */
-void *
-__repmgr_select_thread(args)
- void *args;
-{
- DB_ENV *dbenv = args;
- int ret;
-
- if ((ret = __repmgr_select_loop(dbenv)) != 0) {
- __db_err(dbenv, ret, "select loop failed");
- __repmgr_thread_failure(dbenv, ret);
- }
- return (NULL);
-}
-
-/*
- * PUBLIC: int __repmgr_accept __P((DB_ENV *));
- *
- * !!!
- * Only ever called in the select() thread, since we may call
- * __repmgr_bust_connection(..., TRUE).
- */
-int
-__repmgr_accept(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- REPMGR_CONNECTION *conn;
- struct sockaddr_in siaddr;
- socklen_t addrlen;
- socket_t s;
- int ret;
-#ifdef DB_WIN32
- WSAEVENT event_obj;
-#endif
-
- db_rep = dbenv->rep_handle;
- addrlen = sizeof(siaddr);
- if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
- &addrlen)) == -1) {
- /*
- * Some errors are innocuous and so should be ignored. MSDN
- * Library documents the Windows ones; the Unix ones are
- * advocated in Stevens' UNPv1, section 16.6; and Linux
- * Application Development, p. 416.
- */
- switch (ret = net_errno) {
-#ifdef DB_WIN32
- case WSAECONNRESET:
- case WSAEWOULDBLOCK:
-#else
- case EINTR:
- case EWOULDBLOCK:
- case ECONNABORTED:
- case ENETDOWN:
-#ifdef EPROTO
- case EPROTO:
-#endif
- case ENOPROTOOPT:
- case EHOSTDOWN:
-#ifdef ENONET
- case ENONET:
-#endif
- case EHOSTUNREACH:
- case EOPNOTSUPP:
- case ENETUNREACH:
-#endif
- RPRINT(dbenv, (dbenv,
- "accept error %d considered innocuous", ret));
- return (0);
- default:
- __db_err(dbenv, ret, "accept error");
- return (ret);
- }
- }
- RPRINT(dbenv, (dbenv, "accepted a new connection"));
-
- if ((ret = __repmgr_set_nonblocking(s)) != 0) {
- __db_err(dbenv, ret, "can't set nonblock after accept");
- (void)closesocket(s);
- return (ret);
- }
-
-#ifdef DB_WIN32
- if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
- ret = net_errno;
- __db_err(dbenv, ret, "can't create WSA event");
- (void)closesocket(s);
- return (ret);
- }
- if (WSAEventSelect(s, event_obj, FD_READ|FD_CLOSE) == SOCKET_ERROR) {
- ret = net_errno;
- __db_err(dbenv, ret, "can't set desired event bits");
- (void)WSACloseEvent(event_obj);
- (void)closesocket(s);
- return (ret);
- }
-#endif
- if ((ret = __repmgr_new_connection(dbenv, &conn, s, 0)) != 0) {
-#ifdef DB_WIN32
- (void)WSACloseEvent(event_obj);
-#endif
- (void)closesocket(s);
- return (ret);
- }
- conn->eid = -1;
-#ifdef DB_WIN32
- conn->event_object = event_obj;
-#endif
-
- switch (ret = __repmgr_send_handshake(dbenv, conn)) {
- case 0:
- return (0);
- case DB_REP_UNAVAIL:
- return (__repmgr_bust_connection(dbenv, conn, TRUE));
- default:
- return (ret);
- }
-}
-
-/*
- * Initiates connection attempts for any sites on the idle list whose retry
- * times have expired.
- *
- * PUBLIC: int __repmgr_retry_connections __P((DB_ENV *));
- *
- * !!!
- * Assumes caller holds the mutex.
- */
-int
-__repmgr_retry_connections(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- REPMGR_RETRY *retry;
- db_timespec now;
- u_int eid;
- int ret;
-
- db_rep = dbenv->rep_handle;
- __os_gettime(dbenv, &now);
-
- while (!TAILQ_EMPTY(&db_rep->retries)) {
- retry = TAILQ_FIRST(&db_rep->retries);
- if (timespeccmp(&retry->time, &now, >=))
- break; /* since items are in time order */
-
- TAILQ_REMOVE(&db_rep->retries, retry, entries);
-
- eid = retry->eid;
- __os_free(dbenv, retry);
-
- if ((ret = __repmgr_try_one(dbenv, eid)) != 0)
- return (ret);
- }
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_first_try_connections __P((DB_ENV *));
- *
- * !!!
- * Assumes caller holds the mutex.
- */
-int
-__repmgr_first_try_connections(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- u_int eid;
- int ret;
-
- db_rep = dbenv->rep_handle;
- for (eid=0; eid<db_rep->site_cnt; eid++)
- if ((ret = __repmgr_try_one(dbenv, eid)) != 0)
- return (ret);
- return (0);
-}
-
-/*
- * Makes a best-effort attempt to connect to the indicated site. Returns a
- * non-zero error indication only for disastrous failures. For re-tryable
- * errors, we will have scheduled another attempt, and that can be considered
- * success enough.
- */
-static int
-__repmgr_try_one(dbenv, eid)
- DB_ENV *dbenv;
- u_int eid;
-{
- DB_REP *db_rep;
- ADDRINFO *list;
- repmgr_netaddr_t *addr;
- int ret;
-
- db_rep = dbenv->rep_handle;
-
- /*
- * If have never yet successfully resolved this site's host name, try to
- * do so now.
- *
- * Throughout all the rest of repmgr, we almost never do any sort of
- * blocking operation in the select thread. This is the sole exception
- * to that rule. Fortunately, it should rarely happen:
- *
- * - for a site that we only learned about because it connected to us:
- * not only were we not configured to know about it, but we also never
- * got a NEWSITE message about it. And even then only if the
- * connection fails and we want to retry it from this end;
- *
- * - if the name look-up system (e.g., DNS) is not working (let's hope
- * it's temporary), or the host name is not found.
- */
- addr = &SITE_FROM_EID(eid)->net_addr;
- if (ADDR_LIST_FIRST(addr) == NULL) {
- if ((ret = __repmgr_getaddr(
- dbenv, addr->host, addr->port, 0, &list)) == 0) {
- addr->address_list = list;
- (void)ADDR_LIST_FIRST(addr);
- } else if (ret == DB_REP_UNAVAIL)
- return (__repmgr_schedule_connection_attempt(
- dbenv, eid, FALSE));
- else
- return (ret);
- }
-
- /* Here, when we have a valid address. */
- return (__repmgr_connect_site(dbenv, eid));
-}
-
-/*
- * Tries to establish a connection with the site indicated by the given eid,
- * starting with the "current" element of its address list and trying as many
- * addresses as necessary until the list is exhausted.
- *
- * !!!
- * Only ever called in the select() thread, since we may call
- * __repmgr_bust_connection(..., TRUE).
- *
- * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
- */
-int
-__repmgr_connect_site(dbenv, eid)
- DB_ENV *dbenv;
- u_int eid;
-{
- DB_REP *db_rep;
- REPMGR_SITE *site;
- REPMGR_CONNECTION *con;
- socket_t s;
- u_int32_t flags;
- int ret;
-#ifdef DB_WIN32
- long desired_event;
- WSAEVENT event_obj;
-#endif
-
- db_rep = dbenv->rep_handle;
- site = SITE_FROM_EID(eid);
-
- flags = 0;
- switch (ret = __repmgr_connect(dbenv, &s, site)) {
- case 0:
- flags = 0;
-#ifdef DB_WIN32
- desired_event = FD_READ|FD_CLOSE;
-#endif
- break;
- case INPROGRESS:
- flags = CONN_CONNECTING;
-#ifdef DB_WIN32
- desired_event = FD_CONNECT;
-#endif
- break;
- default:
- STAT(db_rep->region->mstat.st_connect_fail++);
- return (
- __repmgr_schedule_connection_attempt(dbenv, eid, FALSE));
- }
-
-#ifdef DB_WIN32
- if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
- ret = net_errno;
- __db_err(dbenv, ret, "can't create WSA event");
- (void)closesocket(s);
- return (ret);
- }
- if (WSAEventSelect(s, event_obj, desired_event) == SOCKET_ERROR) {
- ret = net_errno;
- __db_err(dbenv, ret, "can't set desired event bits");
- (void)WSACloseEvent(event_obj);
- (void)closesocket(s);
- return (ret);
- }
-#endif
-
- if ((ret = __repmgr_new_connection(dbenv, &con, s, flags))
- != 0) {
-#ifdef DB_WIN32
- (void)WSACloseEvent(event_obj);
-#endif
- (void)closesocket(s);
- return (ret);
- }
-#ifdef DB_WIN32
- con->event_object = event_obj;
-#endif
-
- if (flags == 0) {
- switch (ret = __repmgr_send_handshake(dbenv, con)) {
- case 0:
- break;
- case DB_REP_UNAVAIL:
- return (__repmgr_bust_connection(dbenv, con, TRUE));
- default:
- return (ret);
- }
- }
-
- con->eid = (int)eid;
-
- site->ref.conn = con;
- site->state = SITE_CONNECTED;
- return (0);
-}
-
-static int
-__repmgr_connect(dbenv, socket_result, site)
- DB_ENV *dbenv;
- socket_t *socket_result;
- REPMGR_SITE *site;
-{
- repmgr_netaddr_t *addr;
- ADDRINFO *ai;
- socket_t s;
- char *why;
- int ret;
- SITE_STRING_BUFFER buffer;
-
- /*
- * Lint doesn't know about DB_ASSERT, so it can't tell that this
- * loop will always get executed at least once, giving 'why' a value.
- */
- COMPQUIET(why, "");
- addr = &site->net_addr;
- ai = ADDR_LIST_CURRENT(addr);
- DB_ASSERT(dbenv, ai != NULL);
- for (; ai != NULL; ai = ADDR_LIST_NEXT(addr)) {
-
- if ((s = socket(ai->ai_family,
- ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) {
- why = "can't create socket to connect";
- continue;
- }
-
- if ((ret = __repmgr_set_nonblocking(s)) != 0) {
- __db_err(dbenv,
- ret, "can't make nonblock socket to connect");
- (void)closesocket(s);
- return (ret);
- }
-
- if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0)
- ret = net_errno;
-
- if (ret == 0 || ret == INPROGRESS) {
- *socket_result = s;
- RPRINT(dbenv, (dbenv,
- "init connection to %s with result %d",
- __repmgr_format_site_loc(site, buffer), ret));
- return (ret);
- }
-
- why = "connection failed";
- (void)closesocket(s);
- }
-
- /* We've exhausted all possible addresses. */
- ret = net_errno;
- __db_err(dbenv, ret, "%s to %s", why,
- __repmgr_format_site_loc(site, buffer));
- return (ret);
-}
-
-/*
- * PUBLIC: int __repmgr_send_handshake __P((DB_ENV *, REPMGR_CONNECTION *));
- */
-int
-__repmgr_send_handshake(dbenv, conn)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
-{
- DB_REP *db_rep;
- REP *rep;
- repmgr_netaddr_t *my_addr;
- DB_REPMGR_HANDSHAKE buffer;
- DBT cntrl, rec;
-
- DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING | CONN_DEFUNCT));
-
- db_rep = dbenv->rep_handle;
- rep = db_rep->region;
- my_addr = &db_rep->my_addr;
-
- /*
- * Remind us that we need to fix priority on the rep API not to be an
- * int, if we're going to pass it across the wire.
- */
- DB_ASSERT(dbenv, sizeof(u_int32_t) >= sizeof(int));
-
- buffer.version = DB_REPMGR_VERSION;
- buffer.priority = htonl((u_int32_t)rep->priority);
- buffer.port = my_addr->port;
- cntrl.data = &buffer;
- cntrl.size = sizeof(buffer);
-
- DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
-
- return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
-}
-
-/*
- * PUBLIC: int __repmgr_read_from_site __P((DB_ENV *, REPMGR_CONNECTION *));
- *
- * !!!
- * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here.
- */
-int
-__repmgr_read_from_site(dbenv, conn)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
-{
- SITE_STRING_BUFFER buffer;
- size_t nr;
- int ret;
-
- /*
- * Keep reading pieces as long as we're making some progress, or until
- * we complete the current read phase.
- */
- for (;;) {
- if ((ret = __repmgr_readv(conn->fd,
- &conn->iovecs.vectors[conn->iovecs.offset],
- conn->iovecs.count - conn->iovecs.offset, &nr)) != 0) {
- switch (ret) {
-#ifndef DB_WIN32
- case EINTR:
- continue;
-#endif
- case WOULDBLOCK:
- return (0);
- default:
- (void)__repmgr_format_eid_loc(dbenv->rep_handle,
- conn->eid, buffer);
- __db_err(dbenv, ret,
- "can't read from %s", buffer);
- STAT(dbenv->rep_handle->
- region->mstat.st_connection_drop++);
- return (DB_REP_UNAVAIL);
- }
- }
-
- if (nr > 0) {
- if (__repmgr_update_consumed(&conn->iovecs, nr))
- return (dispatch_phase_completion(dbenv,
- conn));
- } else {
- (void)__repmgr_format_eid_loc(dbenv->rep_handle,
- conn->eid, buffer);
- __db_errx(dbenv, "EOF on connection from %s", buffer);
- STAT(dbenv->rep_handle->
- region->mstat.st_connection_drop++);
- return (DB_REP_UNAVAIL);
- }
- }
-}
-
-/*
- * Handles whatever needs to be done upon the completion of a reading phase on a
- * given connection.
- */
-static int
-dispatch_phase_completion(dbenv, conn)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
-{
-#define MEM_ALIGN sizeof(double)
- DB_REP *db_rep;
- DB_REPMGR_ACK *ack;
- REPMGR_SITE *site;
- DB_REPMGR_HANDSHAKE *handshake;
- REPMGR_RETRY *retry;
- repmgr_netaddr_t addr;
- DBT *dbt;
- u_int32_t control_size, rec_size;
- size_t memsize, control_offset, rec_offset;
- void *membase;
- char *host;
- u_int port;
- int ret, eid;
-
- db_rep = dbenv->rep_handle;
- switch (conn->reading_phase) {
- case SIZES_PHASE:
- /*
- * We've received the header: a message type and the lengths of
- * the two pieces of the message. Set up buffers to read the
- * two pieces.
- */
-
- if (conn->msg_type != REPMGR_HANDSHAKE &&
- !IS_VALID_EID(conn->eid)) {
- __db_errx(dbenv,
- "expected handshake as first msg from passively connected site");
- return (DB_REP_UNAVAIL);
- }
-
- __repmgr_iovec_init(&conn->iovecs);
- control_size = ntohl(conn->control_size_buf);
- rec_size = ntohl(conn->rec_size_buf);
- if (conn->msg_type == REPMGR_REP_MESSAGE) {
- /*
- * Allocate a block of memory large enough to hold a
- * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT
- * data areas that it points to. Start by calculating
- * the total memory needed, rounding up for the start of
- * each DBT, to ensure possible alignment requirements.
- */
- memsize = (size_t)
- DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
- control_offset = memsize;
- memsize += control_size;
- if (rec_size > 0) {
- memsize = (size_t)DB_ALIGN(memsize, MEM_ALIGN);
- rec_offset = memsize;
- memsize += rec_size;
- } else
- COMPQUIET(rec_offset, 0);
- if ((ret = __os_malloc(dbenv, memsize, &membase)) != 0)
- return (ret);
- conn->input.rep_message = membase;
- memset(&conn->input.rep_message->control, 0,
- sizeof(DBT));
- memset(&conn->input.rep_message->rec, 0, sizeof(DBT));
- conn->input.rep_message->originating_eid = conn->eid;
- conn->input.rep_message->control.size = control_size;
- conn->input.rep_message->control.data =
- (u_int8_t*)membase + control_offset;
- __repmgr_add_buffer(&conn->iovecs,
- conn->input.rep_message->control.data,
- control_size);
-
- conn->input.rep_message->rec.size = rec_size;
- if (rec_size > 0) {
- conn->input.rep_message->rec.data =
- (u_int8_t*)membase + rec_offset;
- __repmgr_add_buffer(&conn->iovecs,
- conn->input.rep_message->rec.data,
- rec_size);
- } else
- conn->input.rep_message->rec.data = NULL;
-
- } else {
- if (control_size == 0) {
- __db_errx(
- dbenv, "illegal size for non-rep msg");
- return (DB_REP_UNAVAIL);
- }
- conn->input.repmgr_msg.cntrl.size = control_size;
- conn->input.repmgr_msg.rec.size = rec_size;
-
- dbt = &conn->input.repmgr_msg.cntrl;
- dbt->size = control_size;
- if ((ret = __os_malloc(dbenv, control_size,
- &dbt->data)) != 0)
- return (ret);
- __repmgr_add_dbt(&conn->iovecs, dbt);
-
- dbt = &conn->input.repmgr_msg.rec;
- if ((dbt->size = rec_size) > 0) {
- if ((ret = __os_malloc(dbenv, rec_size,
- &dbt->data)) != 0) {
- __os_free(dbenv,
- conn->input.repmgr_msg.cntrl.data);
- return (ret);
- }
- __repmgr_add_dbt(&conn->iovecs, dbt);
- }
- }
-
- conn->reading_phase = DATA_PHASE;
- break;
-
- case DATA_PHASE:
- /*
- * We have a complete message, so process it. Acks and
- * handshakes get processed here, in line. Regular rep messages
- * get posted to a queue, to be handled by a thread from the
- * message thread pool.
- */
- switch (conn->msg_type) {
- case REPMGR_ACK:
- /*
- * Extract the LSN. Save it only if it is an
- * improvement over what the site has already ack'ed.
- */
- ack = conn->input.repmgr_msg.cntrl.data;
- if (conn->input.repmgr_msg.cntrl.size != sizeof(*ack) ||
- conn->input.repmgr_msg.rec.size != 0) {
- __db_errx(dbenv, "bad ack msg size");
- return (DB_REP_UNAVAIL);
- }
- if ((ret = record_ack(dbenv, SITE_FROM_EID(conn->eid),
- ack)) != 0)
- return (ret);
- __os_free(dbenv, conn->input.repmgr_msg.cntrl.data);
- break;
-
- case REPMGR_HANDSHAKE:
- handshake = conn->input.repmgr_msg.cntrl.data;
- if (conn->input.repmgr_msg.cntrl.size >=
- sizeof(handshake->version) &&
- handshake->version != DB_REPMGR_VERSION) {
- __db_errx(dbenv,
- "mismatched repmgr message protocol version (%lu)",
- (u_long)handshake->version);
- return (DB_REP_UNAVAIL);
- }
- if (conn->input.repmgr_msg.cntrl.size !=
- sizeof(*handshake) ||
- conn->input.repmgr_msg.rec.size == 0) {
- __db_errx(dbenv, "bad handshake msg size");
- return (DB_REP_UNAVAIL);
- }
-
- port = handshake->port;
- host = conn->input.repmgr_msg.rec.data;
- host[conn->input.repmgr_msg.rec.size-1] = '\0';
-
- RPRINT(dbenv, (dbenv,
- "got handshake %s:%u, pri %lu", host, port,
- (u_long)ntohl(handshake->priority)));
-
- if (IS_VALID_EID(conn->eid)) {
- /*
- * We must have initiated this as an outgoing
- * connection, since we already know the EID.
- * All we need from the handshake is the
- * priority.
- */
- site = SITE_FROM_EID(conn->eid);
- RPRINT(dbenv, (dbenv,
- "handshake from connection to %s:%lu",
- site->net_addr.host,
- (u_long)site->net_addr.port));
- } else {
- if (IS_VALID_EID(eid =
- __repmgr_find_site(dbenv, host, port))) {
- site = SITE_FROM_EID(eid);
- if (site->state == SITE_IDLE) {
- RPRINT(dbenv, (dbenv,
- "handshake from previously idle site"));
- retry = site->ref.retry;
- TAILQ_REMOVE(&db_rep->retries,
- retry, entries);
- __os_free(dbenv, retry);
-
- conn->eid = eid;
- site->state = SITE_CONNECTED;
- site->ref.conn = conn;
- } else {
- /*
- * We got an incoming connection
- * for a site we were already
- * connected to; discard it.
- */
- __db_errx(dbenv,
- "redundant incoming connection will be ignored");
- return (DB_REP_UNAVAIL);
- }
- } else {
- RPRINT(dbenv, (dbenv,
- "handshake introduces unknown site"));
- if ((ret = __repmgr_pack_netaddr(
- dbenv, host, port, NULL,
- &addr)) != 0)
- return (ret);
- if ((ret = __repmgr_new_site(
- dbenv, &site, &addr,
- SITE_CONNECTED)) != 0) {
- __repmgr_cleanup_netaddr(dbenv,
- &addr);
- return (ret);
- }
- conn->eid = EID_FROM_SITE(site);
- site->ref.conn = conn;
- }
- }
-
- /* TODO: change priority to be u_int32_t. */
- DB_ASSERT(dbenv, sizeof(int) == sizeof(u_int32_t));
- site->priority = (int)ntohl(handshake->priority);
-
- if ((ret = notify_handshake(dbenv, conn)) != 0)
- return (ret);
-
- __os_free(dbenv, conn->input.repmgr_msg.cntrl.data);
- __os_free(dbenv, conn->input.repmgr_msg.rec.data);
- break;
-
- case REPMGR_REP_MESSAGE:
- if ((ret = __repmgr_queue_put(dbenv,
- conn->input.rep_message)) != 0)
- return (ret);
- /*
- * The queue has taken over responsibility for the
- * rep_message buffer, and will free it later.
- */
- break;
-
- default:
- __db_errx(dbenv, "unknown msg type rcvd: %d",
- (int)conn->msg_type);
- return (DB_REP_UNAVAIL);
- }
-
- __repmgr_reset_for_reading(conn);
- break;
-
- default:
- DB_ASSERT(dbenv, FALSE);
- }
-
- return (0);
-}
-
-/*
- * Performs any processing needed upon the receipt of a handshake.
- *
- * !!!
- * Caller must hold mutex.
- */
-static int
-notify_handshake(dbenv, conn)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
-{
- DB_REP *db_rep;
-
- COMPQUIET(conn, NULL);
-
- db_rep = dbenv->rep_handle;
- /*
- * If we're moping around wishing we knew who the master was, then
- * getting in touch with another site might finally provide sufficient
- * connectivity to find out. But just do this once, because otherwise
- * we get messages while the subsequent rep_start operations are going
- * on, and rep tosses them in that case.
- */
- if (db_rep->master_eid == DB_EID_INVALID && !db_rep->done_one) {
- db_rep->done_one = TRUE;
- RPRINT(dbenv, (dbenv,
- "handshake with no known master to wake election thread"));
- return (__repmgr_init_election(dbenv, ELECT_REPSTART));
- }
- return (0);
-}
-
-static int
-record_ack(dbenv, site, ack)
- DB_ENV *dbenv;
- REPMGR_SITE *site;
- DB_REPMGR_ACK *ack;
-{
- DB_REP *db_rep;
- SITE_STRING_BUFFER buffer;
- int ret;
-
- db_rep = dbenv->rep_handle;
-
- /* Ignore stale acks. */
- if (ack->generation < db_rep->generation) {
- RPRINT(dbenv, (dbenv,
- "ignoring stale ack (%lu<%lu), from %s",
- (u_long)ack->generation, (u_long)db_rep->generation,
- __repmgr_format_site_loc(site, buffer)));
- return (0);
- }
- RPRINT(dbenv, (dbenv,
- "got ack [%lu][%lu](%lu) from %s", (u_long)ack->lsn.file,
- (u_long)ack->lsn.offset, (u_long)ack->generation,
- __repmgr_format_site_loc(site, buffer)));
-
- if (ack->generation == db_rep->generation &&
- log_compare(&ack->lsn, &site->max_ack) == 1) {
- memcpy(&site->max_ack, &ack->lsn, sizeof(DB_LSN));
- if ((ret = __repmgr_wake_waiting_senders(dbenv)) != 0)
- return (ret);
- }
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_write_some __P((DB_ENV *, REPMGR_CONNECTION *));
- */
-int
-__repmgr_write_some(dbenv, conn)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
-{
- REPMGR_FLAT *msg;
- QUEUED_OUTPUT *output;
- int bytes, ret;
-
- while (!STAILQ_EMPTY(&conn->outbound_queue)) {
- output = STAILQ_FIRST(&conn->outbound_queue);
- msg = output->msg;
- if ((bytes = send(conn->fd, &msg->data[output->offset],
- (size_t)msg->length - output->offset, 0)) == SOCKET_ERROR) {
- if ((ret = net_errno) == WOULDBLOCK)
- return (0);
- else {
- __db_err(dbenv, ret, "writing data");
- STAT(dbenv->rep_handle->
- region->mstat.st_connection_drop++);
- return (DB_REP_UNAVAIL);
- }
- }
-
- if ((output->offset += (size_t)bytes) >= msg->length) {
- STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
- __os_free(dbenv, output);
- conn->out_queue_length--;
- if (--msg->ref_count <= 0)
- __os_free(dbenv, msg);
- }
- }
-
-#ifdef DB_WIN32
- /*
- * With the queue now empty, it's time to relinquish ownership of this
- * connection again, so that the next call to send() can write the
- * message in line, instead of posting it to the queue for us.
- */
- if (WSAEventSelect(conn->fd, conn->event_object, FD_READ|FD_CLOSE)
- == SOCKET_ERROR) {
- ret = net_errno;
- __db_err(dbenv, ret, "can't remove FD_WRITE event bit");
- return (ret);
- }
-#endif
-
- return (0);
-}
diff --git a/db/repmgr/repmgr_stat.c b/db/repmgr/repmgr_stat.c
deleted file mode 100644
index 825bfa765..000000000
--- a/db/repmgr/repmgr_stat.c
+++ /dev/null
@@ -1,298 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2005,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_stat.c,v 1.35 2007/06/22 18:26:52 bostic Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#include "db_int.h"
-
-#ifdef HAVE_STATISTICS
-static int __repmgr_print_all __P((DB_ENV *, u_int32_t));
-static int __repmgr_print_sites __P((DB_ENV *));
-static int __repmgr_print_stats __P((DB_ENV *, u_int32_t));
-static int __repmgr_stat __P((DB_ENV *, DB_REPMGR_STAT **, u_int32_t));
-static int __repmgr_stat_print __P((DB_ENV *, u_int32_t));
-
-/*
- * __repmgr_stat_pp --
- * DB_ENV->repmgr_stat pre/post processing.
- *
- * PUBLIC: int __repmgr_stat_pp __P((DB_ENV *, DB_REPMGR_STAT **, u_int32_t));
- */
-int
-__repmgr_stat_pp(dbenv, statp, flags)
- DB_ENV *dbenv;
- DB_REPMGR_STAT **statp;
- u_int32_t flags;
-{
- DB_THREAD_INFO *ip;
- int ret;
-
- PANIC_CHECK(dbenv);
- ENV_REQUIRES_CONFIG_XX(
- dbenv, rep_handle, "DB_ENV->repmgr_stat", DB_INIT_REP);
-
- if ((ret = __db_fchk(dbenv,
- "DB_ENV->repmgr_stat", flags, DB_STAT_CLEAR)) != 0)
- return (ret);
-
- ENV_ENTER(dbenv, ip);
- ret = __repmgr_stat(dbenv, statp, flags);
- ENV_LEAVE(dbenv, ip);
-
- return (ret);
-}
-
-/*
- * __repmgr_stat --
- * DB_ENV->repmgr_stat.
- */
-static int
-__repmgr_stat(dbenv, statp, flags)
- DB_ENV *dbenv;
- DB_REPMGR_STAT **statp;
- u_int32_t flags;
-{
- DB_REP *db_rep;
- DB_REPMGR_STAT *stats;
- int ret;
-
- db_rep = dbenv->rep_handle;
-
- *statp = NULL;
-
- /* Allocate a stat struct to return to the user. */
- if ((ret = __os_umalloc(dbenv, sizeof(DB_REPMGR_STAT), &stats)) != 0)
- return (ret);
-
- memcpy(stats, &db_rep->region->mstat, sizeof(*stats));
- if (LF_ISSET(DB_STAT_CLEAR))
- memset(&db_rep->region->mstat, 0, sizeof(DB_REPMGR_STAT));
-
- *statp = stats;
- return (0);
-}
-
-/*
- * __repmgr_stat_print_pp --
- * DB_ENV->repmgr_stat_print pre/post processing.
- *
- * PUBLIC: int __repmgr_stat_print_pp __P((DB_ENV *, u_int32_t));
- */
-int
-__repmgr_stat_print_pp(dbenv, flags)
- DB_ENV *dbenv;
- u_int32_t flags;
-{
- DB_THREAD_INFO *ip;
- int ret;
-
- PANIC_CHECK(dbenv);
- ENV_REQUIRES_CONFIG_XX(
- dbenv, rep_handle, "DB_ENV->repmgr_stat_print", DB_INIT_REP);
-
- if ((ret = __db_fchk(dbenv, "DB_ENV->repmgr_stat_print",
- flags, DB_STAT_ALL | DB_STAT_CLEAR)) != 0)
- return (ret);
-
- ENV_ENTER(dbenv, ip);
- ret = __repmgr_stat_print(dbenv, flags);
- ENV_LEAVE(dbenv, ip);
-
- return (ret);
-}
-
-int
-__repmgr_stat_print(dbenv, flags)
- DB_ENV *dbenv;
- u_int32_t flags;
-{
- u_int32_t orig_flags;
- int ret;
-
- orig_flags = flags;
- LF_CLR(DB_STAT_CLEAR | DB_STAT_SUBSYSTEM);
- if (flags == 0 || LF_ISSET(DB_STAT_ALL)) {
- if ((ret = __repmgr_print_stats(dbenv, orig_flags)) == 0)
- ret = __repmgr_print_sites(dbenv);
- if (flags == 0 || ret != 0)
- return (ret);
- }
-
- if (LF_ISSET(DB_STAT_ALL) &&
- (ret = __repmgr_print_all(dbenv, orig_flags)) != 0)
- return (ret);
-
- return (0);
-}
-
-static int
-__repmgr_print_stats(dbenv, flags)
- DB_ENV *dbenv;
- u_int32_t flags;
-{
- DB_REPMGR_STAT *sp;
- int ret;
-
- if ((ret = __repmgr_stat(dbenv, &sp, flags)) != 0)
- return (ret);
-
- __db_dl(dbenv, "Number of PERM messages not acknowledged",
- (u_long)sp->st_perm_failed);
- __db_dl(dbenv, "Number of messages queued due to network delay",
- (u_long)sp->st_msgs_queued);
- __db_dl(dbenv, "Number of messages discarded due to queue length",
- (u_long)sp->st_msgs_dropped);
- __db_dl(dbenv, "Number of existing connections dropped",
- (u_long)sp->st_connection_drop);
- __db_dl(dbenv, "Number of failed new connection attempts",
- (u_long)sp->st_connect_fail);
-
- __os_ufree(dbenv, sp);
-
- return (0);
-}
-
-static int
-__repmgr_print_sites(dbenv)
- DB_ENV *dbenv;
-{
- DB_REPMGR_SITE *list;
- u_int count, i;
- int ret;
-
- if ((ret = __repmgr_site_list(dbenv, &count, &list)) != 0)
- return (ret);
-
- if (count == 0)
- return (0);
-
- __db_msg(dbenv, "%s", DB_GLOBAL(db_line));
- __db_msg(dbenv, "DB_REPMGR site information:");
-
- for (i = 0; i < count; ++i) {
- __db_msg(dbenv, "%s (eid: %d, port: %u, %sconnected)",
- list[i].host, list[i].eid, list[i].port,
- list[i].status == DB_REPMGR_CONNECTED ? "" : "dis");
- }
-
- __os_ufree(dbenv, list);
-
- return (0);
-}
-
-/*
- * __repmgr_print_all --
- * Display debugging replication manager statistics.
- */
-static int
-__repmgr_print_all(dbenv, flags)
- DB_ENV *dbenv;
- u_int32_t flags;
-{
- COMPQUIET(dbenv, NULL);
- COMPQUIET(flags, 0);
- return (0);
-}
-
-#else /* !HAVE_STATISTICS */
-
-int
-__repmgr_stat_pp(dbenv, statp, flags)
- DB_ENV *dbenv;
- DB_REPMGR_STAT **statp;
- u_int32_t flags;
-{
- COMPQUIET(statp, NULL);
- COMPQUIET(flags, 0);
-
- return (__db_stat_not_built(dbenv));
-}
-
-int
-__repmgr_stat_print_pp(dbenv, flags)
- DB_ENV *dbenv;
- u_int32_t flags;
-{
- COMPQUIET(flags, 0);
-
- return (__db_stat_not_built(dbenv));
-}
-#endif
-
-/*
- * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **));
- */
-int
-__repmgr_site_list(dbenv, countp, listp)
- DB_ENV *dbenv;
- u_int *countp;
- DB_REPMGR_SITE **listp;
-{
- DB_REP *db_rep;
- DB_REPMGR_SITE *status;
- REPMGR_SITE *site;
- size_t array_size, total_size;
- u_int count, i;
- int locked, ret;
- char *name;
-
- db_rep = dbenv->rep_handle;
- if (REPMGR_SYNC_INITED(db_rep)) {
- LOCK_MUTEX(db_rep->mutex);
- locked = TRUE;
- } else
- locked = FALSE;
-
- /* Initialize for empty list or error return. */
- ret = 0;
- *countp = 0;
- *listp = NULL;
-
- /* First, add up how much memory we need for the host names. */
- if ((count = db_rep->site_cnt) == 0)
- goto err;
-
- array_size = sizeof(DB_REPMGR_SITE) * count;
- total_size = array_size;
- for (i = 0; i < count; i++) {
- site = &db_rep->sites[i];
-
- /* Make room for the NUL terminating byte. */
- total_size += strlen(site->net_addr.host) + 1;
- }
-
- if ((ret = __os_umalloc(dbenv, total_size, &status)) != 0)
- goto err;
-
- /*
- * Put the storage for the host names after the array of structs. This
- * way, the caller can free the whole thing in one single operation.
- */
- name = (char *)((u_int8_t *)status + array_size);
- for (i = 0; i < count; i++) {
- site = &db_rep->sites[i];
-
- status[i].eid = EID_FROM_SITE(site);
-
- status[i].host = name;
- (void)strcpy(name, site->net_addr.host);
- name += strlen(name) + 1;
-
- status[i].port = site->net_addr.port;
- status[i].status = site->state == SITE_CONNECTED ?
- DB_REPMGR_CONNECTED : DB_REPMGR_DISCONNECTED;
- }
-
- *countp = count;
- *listp = status;
-
-err: if (locked)
- UNLOCK_MUTEX(db_rep->mutex);
- return (ret);
-}
diff --git a/db/repmgr/repmgr_stub.c b/db/repmgr/repmgr_stub.c
deleted file mode 100644
index 1f74f6f01..000000000
--- a/db/repmgr/repmgr_stub.c
+++ /dev/null
@@ -1,198 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 1996,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_stub.c,v 1.7 2007/05/17 15:15:51 bostic Exp $
- */
-
-#ifndef HAVE_REPLICATION_THREADS
-#include "db_config.h"
-
-#include "db_int.h"
-
-/*
- * If the library wasn't compiled with replication support, various routines
- * aren't available. Stub them here, returning an appropriate error.
- */
-static int __db_norepmgr __P((DB_ENV *));
-
-/*
- * __db_norepmgr --
- * Error when a Berkeley DB build doesn't include replication mgr support.
- */
-static int
-__db_norepmgr(dbenv)
- DB_ENV *dbenv;
-{
- __db_errx(dbenv,
- "library build did not include support for the Replication Manager");
- return (DB_OPNOTSUP);
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_close __P((DB_ENV *));
- * PUBLIC: #endif
- */
-int
-__repmgr_close(dbenv)
- DB_ENV *dbenv;
-{
- COMPQUIET(dbenv, NULL);
- return (0);
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_add_remote_site
- * PUBLIC: __P((DB_ENV *, const char *, u_int, int *, u_int32_t));
- * PUBLIC: #endif
- */
-int
-__repmgr_add_remote_site(dbenv, host, port, eidp, flags)
- DB_ENV *dbenv;
- const char *host;
- u_int port;
- int *eidp;
- u_int32_t flags;
-{
- COMPQUIET(host, NULL);
- COMPQUIET(port, 0);
- COMPQUIET(eidp, NULL);
- COMPQUIET(flags, 0);
- return (__db_norepmgr(dbenv));
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *));
- * PUBLIC: #endif
- */
-int
-__repmgr_get_ack_policy(dbenv, policy)
- DB_ENV *dbenv;
- int *policy;
-{
- COMPQUIET(policy, NULL);
- return (__db_norepmgr(dbenv));
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int));
- * PUBLIC: #endif
- */
-int
-__repmgr_set_ack_policy(dbenv, policy)
- DB_ENV *dbenv;
- int policy;
-{
- COMPQUIET(policy, 0);
- return (__db_norepmgr(dbenv));
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_set_local_site
- * PUBLIC: __P((DB_ENV *, const char *, u_int, u_int32_t));
- * PUBLIC: #endif
- */
-int
-__repmgr_set_local_site(dbenv, host, port, flags)
- DB_ENV *dbenv;
- const char *host;
- u_int port;
- u_int32_t flags;
-{
- COMPQUIET(host, NULL);
- COMPQUIET(port, 0);
- COMPQUIET(flags, 0);
- return (__db_norepmgr(dbenv));
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **));
- * PUBLIC: #endif
- */
-int
-__repmgr_site_list(dbenv, countp, listp)
- DB_ENV *dbenv;
- u_int *countp;
- DB_REPMGR_SITE **listp;
-{
- COMPQUIET(countp, NULL);
- COMPQUIET(listp, NULL);
- return (__db_norepmgr(dbenv));
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t));
- * PUBLIC: #endif
- */
-int
-__repmgr_start(dbenv, nthreads, flags)
- DB_ENV *dbenv;
- int nthreads;
- u_int32_t flags;
-{
- COMPQUIET(nthreads, 0);
- COMPQUIET(flags, 0);
- return (__db_norepmgr(dbenv));
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_stat_pp __P((DB_ENV *, DB_REPMGR_STAT **, u_int32_t));
- * PUBLIC: #endif
- */
-int
-__repmgr_stat_pp(dbenv, statp, flags)
- DB_ENV *dbenv;
- DB_REPMGR_STAT **statp;
- u_int32_t flags;
-{
- COMPQUIET(statp, NULL);
- COMPQUIET(flags, 0);
- return (__db_norepmgr(dbenv));
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_stat_print_pp __P((DB_ENV *, u_int32_t));
- * PUBLIC: #endif
- */
-int
-__repmgr_stat_print_pp(dbenv, flags)
- DB_ENV *dbenv;
- u_int32_t flags;
-{
- COMPQUIET(flags, 0);
- return (__db_norepmgr(dbenv));
-}
-
-/*
- * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
- * PUBLIC: #endif
- */
-int
-__repmgr_handle_event(dbenv, event, info)
- DB_ENV *dbenv;
- u_int32_t event;
- void *info;
-{
- COMPQUIET(dbenv, NULL);
- COMPQUIET(event, 0);
- COMPQUIET(info, NULL);
-
- /*
- * It's not an error for this function to be called. Replication calls
- * this to let repmgr handle events. If repmgr isn't part of the build,
- * all replication events should be forwarded to the application.
- */
- return (DB_EVENT_NOT_HANDLED);
-}
-#endif /* !HAVE_REPLICATION_THREADS */
diff --git a/db/repmgr/repmgr_util.c b/db/repmgr/repmgr_util.c
deleted file mode 100644
index b6cbf3c9c..000000000
--- a/db/repmgr/repmgr_util.c
+++ /dev/null
@@ -1,426 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2005,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_util.c,v 1.34 2007/06/11 18:29:34 alanb Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#include "db_int.h"
-
-/*
- * Schedules a future attempt to re-establish a connection with the given site.
- * Usually, we wait the configured retry_wait period. But if the "immediate"
- * parameter is given as TRUE, we'll make the wait time 0, and put the request
- * at the _beginning_ of the retry queue. Note how this allows us to preserve
- * the property that the queue stays in time order simply by appending to the
- * end.
- *
- * PUBLIC: int __repmgr_schedule_connection_attempt __P((DB_ENV *, u_int, int));
- *
- * !!!
- * Caller should hold mutex.
- *
- * Unless an error occurs, we always attempt to wake the main thread;
- * __repmgr_bust_connection relies on this behavior.
- */
-int
-__repmgr_schedule_connection_attempt(dbenv, eid, immediate)
- DB_ENV *dbenv;
- u_int eid;
- int immediate;
-{
- DB_REP *db_rep;
- REPMGR_SITE *site;
- REPMGR_RETRY *retry;
- db_timespec t, v;
- int ret;
-
- db_rep = dbenv->rep_handle;
- if ((ret = __os_malloc(dbenv, sizeof(*retry), &retry)) != 0)
- return (ret);
-
- __os_gettime(dbenv, &t);
- if (immediate)
- TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries);
- else {
- DB_TIMEOUT_TO_TIMESPEC(db_rep->connection_retry_wait, &v);
- timespecadd(&t, &v);
- TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries);
- }
- retry->eid = eid;
- retry->time = t;
-
- site = SITE_FROM_EID(eid);
- site->state = SITE_IDLE;
- site->ref.retry = retry;
-
- return (__repmgr_wake_main_thread(dbenv));
-}
-
-/*
- * Initialize the necessary control structures to begin reading a new input
- * message.
- *
- * PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *));
- */
-void
-__repmgr_reset_for_reading(con)
- REPMGR_CONNECTION *con;
-{
- con->reading_phase = SIZES_PHASE;
- __repmgr_iovec_init(&con->iovecs);
- __repmgr_add_buffer(&con->iovecs, &con->msg_type,
- sizeof(con->msg_type));
- __repmgr_add_buffer(&con->iovecs, &con->control_size_buf,
- sizeof(con->control_size_buf));
- __repmgr_add_buffer(&con->iovecs, &con->rec_size_buf,
- sizeof(con->rec_size_buf));
-}
-
-/*
- * Constructs a DB_REPMGR_CONNECTION structure, and puts it on the main list of
- * connections. It does not initialize eid, since that isn't needed and/or
- * immediately known in all cases.
- *
- * PUBLIC: int __repmgr_new_connection __P((DB_ENV *, REPMGR_CONNECTION **,
- * PUBLIC: socket_t, u_int32_t));
- */
-int
-__repmgr_new_connection(dbenv, connp, s, flags)
- DB_ENV *dbenv;
- REPMGR_CONNECTION **connp;
- socket_t s;
- u_int32_t flags;
-{
- DB_REP *db_rep;
- REPMGR_CONNECTION *c;
- int ret;
-
- db_rep = dbenv->rep_handle;
- if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
- return (ret);
-
- c->fd = s;
- c->flags = flags;
-
- STAILQ_INIT(&c->outbound_queue);
- c->out_queue_length = 0;
-
- __repmgr_reset_for_reading(c);
- TAILQ_INSERT_TAIL(&db_rep->connections, c, entries);
- *connp = c;
-
- return (0);
-}
-
-/*
- * PUBLIC: int __repmgr_new_site __P((DB_ENV *, REPMGR_SITE**,
- * PUBLIC: const repmgr_netaddr_t *, int));
- *
- * !!!
- * Caller must hold mutex.
- */
-int
-__repmgr_new_site(dbenv, sitep, addr, state)
- DB_ENV *dbenv;
- REPMGR_SITE **sitep;
- const repmgr_netaddr_t *addr;
- int state;
-{
- DB_REP *db_rep;
- REPMGR_SITE *site;
- SITE_STRING_BUFFER buffer;
- u_int new_site_max, eid;
- int ret;
-
- db_rep = dbenv->rep_handle;
- if (db_rep->site_cnt >= db_rep->site_max) {
-#define INITIAL_SITES_ALLOCATION 10 /* Arbitrary guess. */
- new_site_max = db_rep->site_max == 0 ?
- INITIAL_SITES_ALLOCATION : db_rep->site_max * 2;
- if ((ret = __os_realloc(dbenv,
- sizeof(REPMGR_SITE) * new_site_max, &db_rep->sites)) != 0)
- return (ret);
- db_rep->site_max = new_site_max;
- }
- eid = db_rep->site_cnt++;
-
- site = &db_rep->sites[eid];
-
- memcpy(&site->net_addr, addr, sizeof(*addr));
- ZERO_LSN(site->max_ack);
- site->priority = -1; /* OOB value indicates we don't yet know. */
- site->state = state;
-
- RPRINT(dbenv, (dbenv, "EID %u is assigned for %s", eid,
- __repmgr_format_site_loc(site, buffer)));
- *sitep = site;
- return (0);
-}
-
-/*
- * Destructor for a repmgr_netaddr_t, cleans up any allocated memory pointed to
- * by the addr.
- *
- * PUBLIC: void __repmgr_cleanup_netaddr __P((DB_ENV *, repmgr_netaddr_t *));
- */
-void
-__repmgr_cleanup_netaddr(dbenv, addr)
- DB_ENV *dbenv;
- repmgr_netaddr_t *addr;
-{
- if (addr->address_list != NULL) {
- __db_freeaddrinfo(dbenv, addr->address_list);
- addr->address_list = addr->current = NULL;
- }
- if (addr->host != NULL) {
- __os_free(dbenv, addr->host);
- addr->host = NULL;
- }
-}
-
-/*
- * PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *));
- */
-void
-__repmgr_iovec_init(v)
- REPMGR_IOVECS *v;
-{
- v->offset = v->count = 0;
- v->total_bytes = 0;
-}
-
-/*
- * PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t));
- *
- * !!!
- * There is no checking for overflow of the vectors[5] array.
- */
-void
-__repmgr_add_buffer(v, address, length)
- REPMGR_IOVECS *v;
- void *address;
- size_t length;
-{
- v->vectors[v->count].iov_base = address;
- v->vectors[v->count++].iov_len = length;
- v->total_bytes += length;
-}
-
-/*
- * PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *));
- */
-void
-__repmgr_add_dbt(v, dbt)
- REPMGR_IOVECS *v;
- const DBT *dbt;
-{
- v->vectors[v->count].iov_base = dbt->data;
- v->vectors[v->count++].iov_len = dbt->size;
- v->total_bytes += dbt->size;
-}
-
-/*
- * Update a set of iovecs to reflect the number of bytes transferred in an I/O
- * operation, so that the iovecs can be used to continue transferring where we
- * left off.
- * Returns TRUE if the set of buffers is now fully consumed, FALSE if more
- * remains.
- *
- * PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t));
- */
-int
-__repmgr_update_consumed(v, byte_count)
- REPMGR_IOVECS *v;
- size_t byte_count;
-{
- db_iovec_t *iov;
- int i;
-
- for (i = v->offset; ; i++) {
- DB_ASSERT(NULL, i < v->count && byte_count > 0);
- iov = &v->vectors[i];
- if (byte_count > iov->iov_len) {
- /*
- * We've consumed (more than) this vector's worth.
- * Adjust count and continue.
- */
- byte_count -= iov->iov_len;
- } else {
- /* Adjust length of remaining portion of vector. */
- iov->iov_len -= byte_count;
- if (iov->iov_len > 0) {
- /*
- * Still some left in this vector. Adjust base
- * address too, and leave offset pointing here.
- */
- iov->iov_base = (void *)
- ((u_int8_t *)iov->iov_base + byte_count);
- v->offset = i;
- } else {
- /*
- * Consumed exactly to a vector boundary.
- * Advance to next vector for next time.
- */
- v->offset = i+1;
- }
- /*
- * If offset has reached count, the entire thing is
- * consumed.
- */
- return (v->offset >= v->count);
- }
- }
-}
-
-/*
- * Builds a buffer containing our network address information, suitable for
- * publishing as cdata via a call to rep_start, and sets up the given DBT to
- * point to it. The buffer is dynamically allocated memory, and the caller must
- * assume responsibility for it.
- *
- * PUBLIC: int __repmgr_prepare_my_addr __P((DB_ENV *, DBT *));
- */
-int
-__repmgr_prepare_my_addr(dbenv, dbt)
- DB_ENV *dbenv;
- DBT *dbt;
-{
- DB_REP *db_rep;
- size_t size, hlen;
- u_int16_t port_buffer;
- u_int8_t *ptr;
- int ret;
-
- db_rep = dbenv->rep_handle;
-
- /*
- * The cdata message consists of the 2-byte port number, in network byte
- * order, followed by the null-terminated host name string.
- */
- port_buffer = htons(db_rep->my_addr.port);
- size = sizeof(port_buffer) +
- (hlen = strlen(db_rep->my_addr.host) + 1);
- if ((ret = __os_malloc(dbenv, size, &ptr)) != 0)
- return (ret);
-
- DB_INIT_DBT(*dbt, ptr, size);
-
- memcpy(ptr, &port_buffer, sizeof(port_buffer));
- ptr = &ptr[sizeof(port_buffer)];
- memcpy(ptr, db_rep->my_addr.host, hlen);
-
- return (0);
-}
-
-/*
- * Provide the appropriate value for nsites, the number of sites in the
- * replication group. If the application has specified a value, use that.
- * Otherwise, just use the number of sites we know of.
- *
- * PUBLIC: u_int __repmgr_get_nsites __P((DB_REP *));
- */
-u_int
-__repmgr_get_nsites(db_rep)
- DB_REP *db_rep;
-{
- if (db_rep->config_nsites > 0)
- return ((u_int)db_rep->config_nsites);
-
- /*
- * The number of other sites in our table, plus 1 to count ourself.
- */
- return (db_rep->site_cnt + 1);
-}
-
-/*
- * PUBLIC: void __repmgr_thread_failure __P((DB_ENV *, int));
- */
-void
-__repmgr_thread_failure(dbenv, why)
- DB_ENV *dbenv;
- int why;
-{
- (void)__repmgr_stop_threads(dbenv);
- (void)__db_panic(dbenv, why);
-}
-
-/*
- * Format a printable representation of a site location, suitable for inclusion
- * in an error message. The buffer must be at least as big as
- * MAX_SITE_LOC_STRING.
- *
- * PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *, int, char *));
- */
-char *
-__repmgr_format_eid_loc(db_rep, eid, buffer)
- DB_REP *db_rep;
- int eid;
- char *buffer;
-{
- if (IS_VALID_EID(eid))
- return (__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer));
-
- snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)");
- return (buffer);
-}
-
-/*
- * PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *));
- */
-char *
-__repmgr_format_site_loc(site, buffer)
- REPMGR_SITE *site;
- char *buffer;
-{
- snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu",
- site->net_addr.host, (u_long)site->net_addr.port);
- return (buffer);
-}
-
-/*
- * __repmgr_timespec_diff_now --
- * Calculate the time duration from now til "when".
- *
- * PUBLIC: void __repmgr_timespec_diff_now
- * PUBLIC: __P((DB_ENV *, db_timespec *, db_timespec *));
- */
-void
-__repmgr_timespec_diff_now(dbenv, when, result)
- DB_ENV *dbenv;
- db_timespec *when, *result;
-{
- db_timespec now;
-
- __os_gettime(dbenv, &now);
- if (timespeccmp(&now, when, >=))
- timespecclear(result);
- else {
- *result = *when;
- timespecsub(result, &now);
- }
-}
-
-/*
- * PUBLIC: int __repmgr_repstart __P((DB_ENV *, u_int32_t));
- */
-int
-__repmgr_repstart(dbenv, flags)
- DB_ENV *dbenv;
- u_int32_t flags;
-{
- DBT my_addr;
- int ret;
-
- if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0)
- return (ret);
- ret = __rep_start(dbenv, &my_addr, flags);
- __os_free(dbenv, my_addr.data);
- if (ret != 0)
- __db_err(dbenv, ret, "rep_start");
- return (ret);
-}
diff --git a/db/repmgr/repmgr_windows.c b/db/repmgr/repmgr_windows.c
deleted file mode 100644
index c48f94e43..000000000
--- a/db/repmgr/repmgr_windows.c
+++ /dev/null
@@ -1,715 +0,0 @@
-/*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 2005,2007 Oracle. All rights reserved.
- *
- * $Id: repmgr_windows.c,v 1.22 2007/06/11 18:29:34 alanb Exp $
- */
-
-#include "db_config.h"
-
-#define __INCLUDE_NETWORKING 1
-#include "db_int.h"
-
-typedef struct __ack_waiter {
- HANDLE event;
- const DB_LSN *lsnp;
- struct __ack_waiter *next_free;
-} ACK_WAITER;
-
-#define WAITER_SLOT_IN_USE(w) ((w)->lsnp != NULL)
-
-/*
- * Array slots [0:next_avail-1] are initialized, and either in use or on the
- * free list. Slots beyond that are virgin territory, whose memory contents
- * could be garbage. In particular, note that slots [0:next_avail-1] have a
- * Win32 Event Object created for them, which have to be freed when cleaning up
- * this data structure.
- *
- * "first_free" points to a list of not-in-use slots threaded through the first
- * section of the array.
- */
-struct __ack_waiters_table {
- struct __ack_waiter *array;
- int size;
- int next_avail;
- struct __ack_waiter *first_free;
-};
-
-static int allocate_wait_slot __P((DB_ENV *, ACK_WAITER **));
-static void free_wait_slot __P((DB_ENV *, ACK_WAITER *));
-static int handle_completion __P((DB_ENV *, REPMGR_CONNECTION *));
-static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *,
- LPWSANETWORKEVENTS));
-
-int
-__repmgr_thread_start(dbenv, runnable)
- DB_ENV *dbenv;
- REPMGR_RUNNABLE *runnable;
-{
- HANDLE thread_id;
-
- runnable->finished = FALSE;
-
- thread_id = CreateThread(NULL, 0,
- (LPTHREAD_START_ROUTINE)runnable->run, dbenv, 0, NULL);
- if (thread_id == NULL)
- return (GetLastError());
- runnable->thread_id = thread_id;
- return (0);
-}
-
-int
-__repmgr_thread_join(thread)
- REPMGR_RUNNABLE *thread;
-{
- if (WaitForSingleObject(thread->thread_id, INFINITE) == WAIT_OBJECT_0)
- return (0);
- return (GetLastError());
-}
-
-int
-__repmgr_set_nonblocking(s)
- SOCKET s;
-{
- int ret;
- u_long arg;
-
- arg = 1; /* any non-zero value */
- if ((ret = ioctlsocket(s, FIONBIO, &arg)) == SOCKET_ERROR)
- return (WSAGetLastError());
- return (0);
-}
-
-/*
- * Wake any send()-ing threads waiting for an acknowledgement.
- *
- * !!!
- * Caller must hold the repmgr->mutex, if this thread synchronization is to work
- * properly.
- */
-int
-__repmgr_wake_waiting_senders(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- ACK_WAITER *slot;
- int i, ret;
-
- ret = 0;
- db_rep = dbenv->rep_handle;
- for (i=0; i<db_rep->waiters->next_avail; i++) {
- slot = &db_rep->waiters->array[i];
- if (!WAITER_SLOT_IN_USE(slot))
- continue;
- if (__repmgr_is_permanent(dbenv, slot->lsnp))
- if (!SetEvent(slot->event) && ret == 0)
- ret = GetLastError();
- }
- return (ret);
-}
-
-/*
- * !!!
- * Caller must hold mutex.
- */
-int
-__repmgr_await_ack(dbenv, lsnp)
- DB_ENV *dbenv;
- const DB_LSN *lsnp;
-{
- DB_REP *db_rep;
- ACK_WAITER *me;
- DWORD ret;
- DWORD timeout;
-
- db_rep = dbenv->rep_handle;
-
- if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
- goto err;
-
- /* convert time-out from microseconds to milliseconds, rounding up */
- timeout = db_rep->ack_timeout > 0 ?
- ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE;
- me->lsnp = lsnp;
- if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
- FALSE)) == WAIT_FAILED) {
- ret = GetLastError();
- } else if (ret == WAIT_TIMEOUT)
- ret = DB_REP_UNAVAIL;
- else
- DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
-
- LOCK_MUTEX(db_rep->mutex);
- free_wait_slot(dbenv, me);
-
-err:
- return (ret);
-}
-
-/*
- * !!!
- * Caller must hold the mutex.
- */
-static int
-allocate_wait_slot(dbenv, resultp)
- DB_ENV *dbenv;
- ACK_WAITER **resultp;
-{
- DB_REP *db_rep;
- ACK_WAITERS_TABLE *table;
- ACK_WAITER *w;
- int ret;
-
- db_rep = dbenv->rep_handle;
- table = db_rep->waiters;
- if (table->first_free == NULL) {
- if (table->next_avail >= table->size) {
- /*
- * Grow the array.
- */
- table->size *= 2;
- w = table->array;
- if ((ret = __os_realloc(dbenv, table->size * sizeof(*w),
- &w)) != 0)
- return (ret);
- table->array = w;
- }
- /*
- * Here if, one way or another, we're good to go for using the
- * next slot (for the first time).
- */
- w = &table->array[table->next_avail++];
- if ((w->event = CreateEvent(NULL, FALSE, FALSE, NULL)) ==
- NULL) {
- /*
- * Maintain the sanctity of our rule that
- * [0:next_avail-1] contain valid Event Objects.
- */
- --table->next_avail;
- return (GetLastError());
- }
- } else {
- w = table->first_free;
- table->first_free = w->next_free;
- }
- *resultp = w;
- return (0);
-}
-
-static void
-free_wait_slot(dbenv, slot)
- DB_ENV *dbenv;
- ACK_WAITER *slot;
-{
- DB_REP *db_rep;
-
- db_rep = dbenv->rep_handle;
-
- slot->lsnp = NULL; /* show it's not in use */
- slot->next_free = db_rep->waiters->first_free;
- db_rep->waiters->first_free = slot;
-}
-
-/*
- * Make resource allocation an all-or-nothing affair, outside of this and the
- * close_sync function. db_rep->waiters should be non-NULL iff all of these
- * resources have been created.
- */
-int
-__repmgr_init_sync(dbenv, db_rep)
- DB_ENV *dbenv;
- DB_REP *db_rep;
-{
-#define INITIAL_ALLOCATION 5 /* arbitrary size */
- ACK_WAITERS_TABLE *table;
- int ret;
-
- db_rep->signaler = db_rep->queue_nonempty = db_rep->check_election =
- db_rep->mutex = NULL;
- table = NULL;
-
- if ((db_rep->signaler = CreateEvent(NULL, /* security attr */
- FALSE, /* (not) of the manual reset variety */
- FALSE, /* (not) initially signaled */
- NULL)) == NULL) /* name */
- goto geterr;
-
- if ((db_rep->queue_nonempty = CreateEvent(NULL, TRUE, FALSE, NULL))
- == NULL)
- goto geterr;
-
- if ((db_rep->check_election = CreateEvent(NULL, FALSE, FALSE, NULL))
- == NULL)
- goto geterr;
-
- if ((db_rep->mutex = CreateMutex(NULL, FALSE, NULL)) == NULL)
- goto geterr;
-
- if ((ret = __os_calloc(dbenv, 1, sizeof(ACK_WAITERS_TABLE), &table))
- != 0)
- goto err;
-
- if ((ret = __os_calloc(dbenv, INITIAL_ALLOCATION, sizeof(ACK_WAITER),
- &table->array)) != 0)
- goto err;
-
- table->size = INITIAL_ALLOCATION;
- table->first_free = NULL;
- table->next_avail = 0;
-
- /* There's a restaurant joke in there somewhere. */
- db_rep->waiters = table;
- return (0);
-
-geterr:
- ret = GetLastError();
-err:
- if (db_rep->check_election != NULL)
- CloseHandle(db_rep->check_election);
- if (db_rep->queue_nonempty != NULL)
- CloseHandle(db_rep->queue_nonempty);
- if (db_rep->signaler != NULL)
- CloseHandle(db_rep->signaler);
- if (db_rep->mutex != NULL)
- CloseHandle(db_rep->mutex);
- if (table != NULL)
- __os_free(dbenv, table);
- db_rep->waiters = NULL;
- return (ret);
-}
-
-int
-__repmgr_close_sync(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- int i, ret;
-
- db_rep = dbenv->rep_handle;
- if (!(REPMGR_SYNC_INITED(db_rep)))
- return (0);
-
- ret = 0;
- for (i = 0; i < db_rep->waiters->next_avail; i++) {
- if (!CloseHandle(db_rep->waiters->array[i].event) && ret == 0)
- ret = GetLastError();
- }
- __os_free(dbenv, db_rep->waiters->array);
- __os_free(dbenv, db_rep->waiters);
-
- if (!CloseHandle(db_rep->check_election) && ret == 0)
- ret = GetLastError();
-
- if (!CloseHandle(db_rep->queue_nonempty) && ret == 0)
- ret = GetLastError();
-
- if (!CloseHandle(db_rep->signaler) && ret == 0)
- ret = GetLastError();
-
- if (!CloseHandle(db_rep->mutex) && ret == 0)
- ret = GetLastError();
-
- db_rep->waiters = NULL;
- return (ret);
-}
-
-/*
- * Performs net-related resource initialization other than memory initialization
- * and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing"
- * sentinel signifying that these resources are allocated (except that now the
- * new wsa_inited flag may be used to indicate that WSAStartup has already been
- * called).
- */
-int
-__repmgr_net_init(dbenv, db_rep)
- DB_ENV *dbenv;
- DB_REP *db_rep;
-{
- int ret;
-
- /* Initialize the Windows sockets DLL. */
- if (!db_rep->wsa_inited && (ret = __repmgr_wsa_init(dbenv)) != 0)
- goto err;
-
- if ((ret = __repmgr_listen(dbenv)) == 0)
- return (0);
-
- if (WSACleanup() == SOCKET_ERROR) {
- ret = net_errno;
- __db_err(dbenv, ret, "WSACleanup");
- }
-
-err: db_rep->listen_fd = INVALID_SOCKET;
- return (ret);
-}
-
-/*
- * __repmgr_wsa_init --
- * Initialize the Windows sockets DLL.
- *
- * PUBLIC: int __repmgr_wsa_init __P((DB_ENV *));
- */
-int
-__repmgr_wsa_init(dbenv)
- DB_ENV *dbenv;
-{
- DB_REP *db_rep;
- WSADATA wsaData;
- int ret;
-
- db_rep = dbenv->rep_handle;
-
- if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) {
- __db_err(dbenv, ret, "unable to initialize Windows networking");
- return (ret);
- }
- db_rep->wsa_inited = TRUE;
-
- return (0);
-}
-
-int
-__repmgr_lock_mutex(mutex)
- mgr_mutex_t *mutex;
-{
- if (WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0)
- return (0);
- return (GetLastError());
-}
-
-int
-__repmgr_unlock_mutex(mutex)
- mgr_mutex_t *mutex;
-{
- if (ReleaseMutex(*mutex))
- return (0);
- return (GetLastError());
-}
-
-int
-__repmgr_signal(v)
- cond_var_t *v;
-{
- return (SetEvent(*v) ? 0 : GetLastError());
-}
-
-int
-__repmgr_wake_main_thread(dbenv)
- DB_ENV *dbenv;
-{
- if (!SetEvent(dbenv->rep_handle->signaler))
- return (GetLastError());
- return (0);
-}
-
-int
-__repmgr_writev(fd, iovec, buf_count, byte_count_p)
- socket_t fd;
- db_iovec_t *iovec;
- int buf_count;
- size_t *byte_count_p;
-{
- DWORD bytes;
-
- if (WSASend(fd, iovec,
- (DWORD)buf_count, &bytes, 0, NULL, NULL) == SOCKET_ERROR)
- return (net_errno);
-
- *byte_count_p = (size_t)bytes;
- return (0);
-}
-
-int
-__repmgr_readv(fd, iovec, buf_count, xfr_count_p)
- socket_t fd;
- db_iovec_t *iovec;
- int buf_count;
- size_t *xfr_count_p;
-{
- DWORD bytes, flags;
-
- flags = 0;
- if (WSARecv(fd, iovec,
- (DWORD)buf_count, &bytes, &flags, NULL, NULL) == SOCKET_ERROR)
- return (net_errno);
-
- *xfr_count_p = (size_t)bytes;
- return (0);
-}
-
-int
-__repmgr_select_loop(dbenv)
- DB_ENV *dbenv;
-{
- DWORD select_timeout;
- DB_REP *db_rep;
- REPMGR_CONNECTION *conn, *next;
- REPMGR_RETRY *retry;
- WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
- REPMGR_CONNECTION *connections[WSA_MAXIMUM_WAIT_EVENTS];
- DWORD nevents, ret;
- db_timespec timeout;
- WSAEVENT listen_event;
- WSANETWORKEVENTS net_events;
- int flow_control, i;
-
- db_rep = dbenv->rep_handle;
-
- if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) {
- __db_err(
- dbenv, net_errno, "can't create event for listen socket");
- return (net_errno);
- }
- if (WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) ==
- SOCKET_ERROR) {
- ret = net_errno;
- __db_err(dbenv, ret, "can't enable event for listener");
- goto out;
- }
-
- LOCK_MUTEX(db_rep->mutex);
- if ((ret = __repmgr_first_try_connections(dbenv)) != 0)
- goto unlock;
- flow_control = FALSE;
- for (;;) {
- /* Start with the two events that we always wait for. */
- events[0] = db_rep->signaler;
- events[1] = listen_event;
- nevents = 2;
-
- /*
- * Add an event for each surviving socket that we're interested
- * in. (For now [until we implement flow control], that's all
- * of them, in one form or another.)
- * Note that even if we're suffering flow control, we
- * nevertheless still read if we haven't even yet gotten a
- * handshake. Why? (1) Handshakes are important; and (2) they
- * don't hurt anything flow-control-wise.
- */
- TAILQ_FOREACH(conn, &db_rep->connections, entries) {
- if (F_ISSET(conn, CONN_CONNECTING) ||
- !STAILQ_EMPTY(&conn->outbound_queue) ||
- (!flow_control || !IS_VALID_EID(conn->eid))) {
- events[nevents] = conn->event_object;
- connections[nevents++] = conn;
- }
- }
-
- /*
- * Decide how long to wait based on when it will next be time to
- * retry an idle connection. (List items are in order, so we
- * only have to examine the first one.)
- */
- if (TAILQ_EMPTY(&db_rep->retries))
- select_timeout = WSA_INFINITE;
- else {
- retry = TAILQ_FIRST(&db_rep->retries);
-
- __repmgr_timespec_diff_now(
- dbenv, &retry->time, &timeout);
- select_timeout =
- (DWORD)(timeout.tv_sec * MS_PER_SEC +
- timeout.tv_nsec / NS_PER_MS);
- }
-
- UNLOCK_MUTEX(db_rep->mutex);
- ret = WSAWaitForMultipleEvents(
- nevents, events, FALSE, select_timeout, FALSE);
- if (db_rep->finished) {
- ret = 0;
- goto out;
- }
- LOCK_MUTEX(db_rep->mutex);
-
- /*
- * The first priority thing we must do is to clean up any
- * pending defunct connections. Otherwise, if they have any
- * lingering pending input, we get very confused if we try to
- * process it.
- * Loop just like TAILQ_FOREACH, except that we need to be
- * able to unlink a list entry.
- */
- for (conn = TAILQ_FIRST(&db_rep->connections);
- conn != NULL;
- conn = next) {
- next = TAILQ_NEXT(conn, entries);
- if (F_ISSET(conn, CONN_DEFUNCT))
- __repmgr_cleanup_connection(dbenv, conn);
- }
-
- /*
- * !!!
- * Note that `ret' remains set as the return code from
- * WSAWaitForMultipleEvents, above.
- */
- if (ret >= WSA_WAIT_EVENT_0 &&
- ret < WSA_WAIT_EVENT_0 + nevents) {
- switch (i = ret - WSA_WAIT_EVENT_0) {
- case 0:
- /* Another thread woke us. */
- break;
- case 1:
- if ((ret = WSAEnumNetworkEvents(
- db_rep->listen_fd, listen_event,
- &net_events)) == SOCKET_ERROR) {
- ret = net_errno;
- goto unlock;
- }
- DB_ASSERT(dbenv,
- net_events.lNetworkEvents & FD_ACCEPT);
- if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT])
- != 0)
- goto unlock;
- if ((ret = __repmgr_accept(dbenv)) != 0)
- goto unlock;
- break;
- default:
- if ((ret = handle_completion(dbenv,
- connections[i])) != 0)
- goto unlock;
- break;
- }
- } else if (ret == WSA_WAIT_TIMEOUT) {
- if ((ret = __repmgr_retry_connections(dbenv)) != 0)
- goto unlock;
- } else if (ret == WSA_WAIT_FAILED) {
- ret = net_errno;
- goto unlock;
- }
- }
-
-unlock:
- UNLOCK_MUTEX(db_rep->mutex);
-out:
- if (!CloseHandle(listen_event) && ret == 0)
- ret = GetLastError();
- return (ret);
-}
-
-/*
- * !!!
- * Only ever called on the select() thread, since we may call
- * __repmgr_bust_connection(..., TRUE).
- */
-static int
-handle_completion(dbenv, conn)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
-{
- int ret;
- WSANETWORKEVENTS events;
-
- if ((ret = WSAEnumNetworkEvents(conn->fd, conn->event_object, &events))
- == SOCKET_ERROR) {
- __db_err(dbenv, net_errno, "EnumNetworkEvents");
- STAT(dbenv->rep_handle->region->mstat.st_connection_drop++);
- ret = DB_REP_UNAVAIL;
- goto err;
- }
-
- if (F_ISSET(conn, CONN_CONNECTING)) {
- if ((ret = finish_connecting(dbenv, conn, &events)) != 0)
- goto err;
- } else { /* Check both writing and reading. */
- if (events.lNetworkEvents & FD_CLOSE) {
- __db_err(dbenv,
- events.iErrorCode[FD_CLOSE_BIT],
- "connection closed");
- STAT(dbenv->rep_handle->
- region->mstat.st_connection_drop++);
- ret = DB_REP_UNAVAIL;
- goto err;
- }
-
- if (events.lNetworkEvents & FD_WRITE) {
- if (events.iErrorCode[FD_WRITE_BIT] != 0) {
- __db_err(dbenv,
- events.iErrorCode[FD_WRITE_BIT],
- "error writing");
- STAT(dbenv->rep_handle->
- region->mstat.st_connection_drop++);
- ret = DB_REP_UNAVAIL;
- goto err;
- } else if ((ret =
- __repmgr_write_some(dbenv, conn)) != 0)
- goto err;
- }
-
- if (events.lNetworkEvents & FD_READ) {
- if (events.iErrorCode[FD_READ_BIT] != 0) {
- __db_err(dbenv,
- events.iErrorCode[FD_READ_BIT],
- "error reading");
- STAT(dbenv->rep_handle->
- region->mstat.st_connection_drop++);
- ret = DB_REP_UNAVAIL;
- goto err;
- } else if ((ret =
- __repmgr_read_from_site(dbenv, conn)) != 0)
- goto err;
- }
- }
-
- return (0);
-
-err: if (ret == DB_REP_UNAVAIL)
- return (__repmgr_bust_connection(dbenv, conn, TRUE));
- return (ret);
-}
-
-static int
-finish_connecting(dbenv, conn, events)
- DB_ENV *dbenv;
- REPMGR_CONNECTION *conn;
- LPWSANETWORKEVENTS events;
-{
- DB_REP *db_rep;
- u_int eid;
-/* char reason[100]; */
- int ret/*, t_ret*/;
-/* DWORD_PTR values[1]; */
-
- if (!(events->lNetworkEvents & FD_CONNECT))
- return (0);
-
- F_CLR(conn, CONN_CONNECTING);
-
- if ((ret = events->iErrorCode[FD_CONNECT_BIT]) != 0) {
-/* t_ret = FormatMessage( */
-/* FORMAT_MESSAGE_IGNORE_INSERTS | */
-/* FORMAT_MESSAGE_FROM_SYSTEM | */
-/* FORMAT_MESSAGE_ARGUMENT_ARRAY, */
-/* NULL, ret, 0, (LPTSTR)reason, sizeof(reason), values); */
-/* __db_err(dbenv/\*, ret*\/, "connecting: %s", */
-/* reason); */
-/* LocalFree(reason); */
- __db_err(dbenv, ret, "connecting");
- goto err;
- }
-
- if (WSAEventSelect(conn->fd, conn->event_object, FD_READ | FD_CLOSE) ==
- SOCKET_ERROR) {
- ret = net_errno;
- __db_err(dbenv, ret, "setting event bits for reading");
- return (ret);
- }
-
- return (__repmgr_send_handshake(dbenv, conn));
-
-err:
- db_rep = dbenv->rep_handle;
- eid = conn->eid;
- DB_ASSERT(dbenv, IS_VALID_EID(eid));
-
- if (ADDR_LIST_NEXT(&SITE_FROM_EID(eid)->net_addr) == NULL) {
- STAT(db_rep->region->mstat.st_connect_fail++);
- return (DB_REP_UNAVAIL);
- }
-
- DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
- __repmgr_cleanup_connection(dbenv, conn);
- ret = __repmgr_connect_site(dbenv, eid);
- DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
- return (ret);
-}