diff options
author | Panu Matilainen <pmatilai@redhat.com> | 2008-03-07 14:05:28 +0200 |
---|---|---|
committer | Panu Matilainen <pmatilai@redhat.com> | 2008-03-07 14:05:28 +0200 |
commit | 2414d522bcfbd0ba9e6115eb7e2a3ba056ac2795 (patch) | |
tree | a995b380eb055a041232681c626cef5af30dccfc /db/repmgr | |
parent | 501197e5ef5ce8687aaf8bd4352f296bb7a5c0e8 (diff) | |
download | librpm-tizen-2414d522bcfbd0ba9e6115eb7e2a3ba056ac2795.tar.gz librpm-tizen-2414d522bcfbd0ba9e6115eb7e2a3ba056ac2795.tar.bz2 librpm-tizen-2414d522bcfbd0ba9e6115eb7e2a3ba056ac2795.zip |
Remove BDB copy from the repository, it doesn't belong there
Diffstat (limited to 'db/repmgr')
-rw-r--r-- | db/repmgr/repmgr_elect.c | 384 | ||||
-rw-r--r-- | db/repmgr/repmgr_method.c | 421 | ||||
-rw-r--r-- | db/repmgr/repmgr_msg.c | 341 | ||||
-rw-r--r-- | db/repmgr/repmgr_net.c | 1075 | ||||
-rw-r--r-- | db/repmgr/repmgr_posix.c | 691 | ||||
-rw-r--r-- | db/repmgr/repmgr_queue.c | 155 | ||||
-rw-r--r-- | db/repmgr/repmgr_sel.c | 875 | ||||
-rw-r--r-- | db/repmgr/repmgr_stat.c | 298 | ||||
-rw-r--r-- | db/repmgr/repmgr_stub.c | 198 | ||||
-rw-r--r-- | db/repmgr/repmgr_util.c | 426 | ||||
-rw-r--r-- | db/repmgr/repmgr_windows.c | 715 |
11 files changed, 0 insertions, 5579 deletions
diff --git a/db/repmgr/repmgr_elect.c b/db/repmgr/repmgr_elect.c deleted file mode 100644 index bf2b41dac..000000000 --- a/db/repmgr/repmgr_elect.c +++ /dev/null @@ -1,384 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2005,2007 Oracle. All rights reserved. - * - * $Id: repmgr_elect.c,v 1.31 2007/05/17 15:15:50 bostic Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#include "db_int.h" - -static int __repmgr_is_ready __P((DB_ENV *)); -static int __repmgr_elect_main __P((DB_ENV *)); -static void *__repmgr_elect_thread __P((void *)); -static int start_election_thread __P((DB_ENV *)); - -/* - * Starts the election thread, or wakes up an existing one, starting off with - * the specified operation (an election, or a call to rep_start(CLIENT), or - * nothing). Avoid multiple concurrent elections. - * - * PUBLIC: int __repmgr_init_election __P((DB_ENV *, int)); - * - * !!! - * Caller must hold mutex. - */ -int -__repmgr_init_election(dbenv, initial_operation) - DB_ENV *dbenv; - int initial_operation; -{ - DB_REP *db_rep; - int ret; - - db_rep = dbenv->rep_handle; - if (db_rep->finished) { - RPRINT(dbenv, (dbenv, - "ignoring elect thread request %d; repmgr is finished", - initial_operation)); - return (0); - } - - db_rep->operation_needed = initial_operation; - if (db_rep->elect_thread == NULL) - ret = start_election_thread(dbenv); - else if (db_rep->elect_thread->finished) { - RPRINT(dbenv, (dbenv, "join dead elect thread")); - if ((ret = __repmgr_thread_join(db_rep->elect_thread)) != 0) - return (ret); - __os_free(dbenv, db_rep->elect_thread); - db_rep->elect_thread = NULL; - ret = start_election_thread(dbenv); - } else { - RPRINT(dbenv, (dbenv, "reusing existing elect thread")); - if ((ret = __repmgr_signal(&db_rep->check_election)) != 0) - __db_err(dbenv, ret, "can't signal election thread"); - } - return (ret); -} - -/* - * !!! - * Caller holds mutex. - */ -static int -start_election_thread(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - REPMGR_RUNNABLE *elector; - int ret; - - db_rep = dbenv->rep_handle; - - if ((ret = __os_malloc(dbenv, sizeof(REPMGR_RUNNABLE), &elector)) - != 0) - return (ret); - elector->dbenv = dbenv; - elector->run = __repmgr_elect_thread; - - if ((ret = __repmgr_thread_start(dbenv, elector)) == 0) - db_rep->elect_thread = elector; - else - __os_free(dbenv, elector); - - return (ret); -} - -static void * -__repmgr_elect_thread(args) - void *args; -{ - DB_ENV *dbenv = args; - int ret; - - RPRINT(dbenv, (dbenv, "starting election thread")); - - if ((ret = __repmgr_elect_main(dbenv)) != 0) { - __db_err(dbenv, ret, "election thread failed"); - __repmgr_thread_failure(dbenv, ret); - } - - RPRINT(dbenv, (dbenv, "election thread is exiting")); - return (NULL); -} - -static int -__repmgr_elect_main(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - DBT my_addr; -#ifdef DB_WIN32 - DWORD duration; -#else - struct timespec deadline; -#endif - u_int nsites, nvotes; - int done, failure_recovery, last_op; - int need_success, ret, succeeded, to_do; - - COMPQUIET(need_success, TRUE); - - db_rep = dbenv->rep_handle; - last_op = 0; - failure_recovery = succeeded = FALSE; - - /* - * db_rep->operation_needed is the mechanism by which the outside world - * (running in a different thread) tells us what it wants us to do. It - * is obviously relevant when we're just starting up. But it can also - * be set if a subsequent request for us to do something occurs while - * we're still looping. - * - * ELECT_FAILURE_ELECTION asks us to start by doing an election, but to - * do so in failure recovery mode. This failure recovery mode may - * persist through several loop iterations: as long as it takes us to - * succeed in finding a master, or until we get asked to perform a new - * request. Thus the time for mapping ELECT_FAILURE_ELECTION to the - * internal ELECT_ELECTION, as well as the setting of the failure - * recovery flag, is at the point we receive the new request from - * operation_needed (either here, or within the loop below). - */ - LOCK_MUTEX(db_rep->mutex); - if (db_rep->finished) { - db_rep->elect_thread->finished = TRUE; - UNLOCK_MUTEX(db_rep->mutex); - return (0); - } - to_do = db_rep->operation_needed; - db_rep->operation_needed = 0; - UNLOCK_MUTEX(db_rep->mutex); - - /* - * The way we are invoked determines the criterion for completion (which - * is represented as "need_success"): if we've been asked to do an - * election, we're only "done" when an election has actually succeeded. - * If we're just here trying to find the master initially, then merely - * getting a valid master_eid suffices. - */ - switch (to_do) { - case ELECT_FAILURE_ELECTION: - failure_recovery = TRUE; - to_do = ELECT_ELECTION; - /* FALLTHROUGH */ - case ELECT_ELECTION: - need_success = TRUE; - break; - case ELECT_SEEK_MASTER: - to_do = 0; /* Caller has already called rep_start. */ - /* FALLTHROUGH */ - case ELECT_REPSTART: - need_success = FALSE; - break; - default: - DB_ASSERT(dbenv, FALSE); - } - /* Here, need_success has been initialized. */ - - for (;;) { - RPRINT(dbenv, (dbenv, "elect thread to do: %d", to_do)); - switch (to_do) { - case ELECT_ELECTION: - nsites = __repmgr_get_nsites(db_rep); - - nvotes = ELECTION_MAJORITY(nsites); - - /* - * If we're doing an election because we noticed that - * the master failed, it's reasonable to expect that the - * master won't participate. By not waiting for its - * vote, we can probably complete the election faster. - * But note that we shouldn't allow this to affect - * nvotes calculation. - */ - if (failure_recovery) { - nsites--; - - if (nsites == 1) { - /* - * We've just lost the only other site - * in the group, so there's no point in - * holding an election. - */ - if ((ret = __repmgr_become_master( - dbenv)) != 0) - return (ret); - break; - } - } - - switch (ret = __rep_elect(dbenv, - (int)nsites, (int)nvotes, 0)) { - case DB_REP_UNAVAIL: - break; - - case 0: - succeeded = TRUE; - if (db_rep->takeover_pending) { - db_rep->takeover_pending = FALSE; - if ((ret = - __repmgr_become_master(dbenv)) != 0) - return (ret); - } - break; - - default: - __db_err( - dbenv, ret, "unexpected election failure"); - return (ret); - } - last_op = ELECT_ELECTION; - break; - case ELECT_REPSTART: - if ((ret = - __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0) - return (ret); - ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT); - __os_free(dbenv, my_addr.data); - if (ret != 0) { - __db_err(dbenv, ret, "rep_start"); - return (ret); - } - last_op = ELECT_REPSTART; - break; - case 0: - /* - * Nothing to do: this can happen the first time - * through, on initialization. - */ - last_op = 0; - break; - default: - DB_ASSERT(dbenv, FALSE); - } - - LOCK_MUTEX(db_rep->mutex); - while (!succeeded && !__repmgr_is_ready(dbenv)) { -#ifdef DB_WIN32 - duration = db_rep->election_retry_wait / US_PER_MS; - ret = SignalObjectAndWait(db_rep->mutex, - db_rep->check_election, duration, FALSE); - LOCK_MUTEX(db_rep->mutex); - if (ret == WAIT_TIMEOUT) - break; - DB_ASSERT(dbenv, ret == WAIT_OBJECT_0); -#else - __repmgr_compute_wait_deadline(dbenv, &deadline, - db_rep->election_retry_wait); - if ((ret = pthread_cond_timedwait( - &db_rep->check_election, &db_rep->mutex, &deadline)) - == ETIMEDOUT) - break; - DB_ASSERT(dbenv, ret == 0); -#endif - } - - /* - * Ways we can get here: election succeeded, sleep duration - * expired, "operation needed", or thread shut-down command. - * - * If we're not yet done, figure out what to do next (which may - * be trivially easy if we've been told explicitly, via the - * "operation needed" flag). We must first check if we've been - * told to do a specific operation, because that could make our - * completion criterion more stringent. Note that we never - * lessen our completion criterion (i.e., unlike the initial - * case, we may leave need_success untouched here). - */ - done = FALSE; - if ((to_do = db_rep->operation_needed) != 0) { - db_rep->operation_needed = 0; - switch (to_do) { - case ELECT_FAILURE_ELECTION: - failure_recovery = TRUE; - to_do = ELECT_ELECTION; - /* FALLTHROUGH */ - case ELECT_ELECTION: - need_success = TRUE; - break; - case ELECT_SEEK_MASTER: - to_do = 0; - break; - default: - break; - } - } else if ((done = (succeeded || - (!need_success && IS_VALID_EID(db_rep->master_eid)) || - db_rep->finished))) - db_rep->elect_thread->finished = TRUE; - else { - if (last_op == ELECT_ELECTION) - to_do = ELECT_REPSTART; - else { - /* - * Generally, if what we previously did is a - * rep_start (or nothing, which really just - * means another thread did the rep_start before - * turning us on), then we next do an election. - * However, with the REP_CLIENT init policy we - * never do an initial election. - */ - to_do = ELECT_ELECTION; - if (db_rep->init_policy == DB_REP_CLIENT && - !db_rep->found_master) - to_do = ELECT_REPSTART; - } - } - - UNLOCK_MUTEX(db_rep->mutex); - if (done) - return (0); - } -} - -/* - * Tests whether another thread has signalled for our attention. - */ -static int -__repmgr_is_ready(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - - db_rep = dbenv->rep_handle; - - RPRINT(dbenv, (dbenv, - "repmgr elect: opcode %d, finished %d, master %d", - db_rep->operation_needed, db_rep->finished, db_rep->master_eid)); - - return (db_rep->operation_needed || db_rep->finished); -} - -/* - * PUBLIC: int __repmgr_become_master __P((DB_ENV *)); - */ -int -__repmgr_become_master(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - DBT my_addr; - int ret; - - db_rep = dbenv->rep_handle; - db_rep->master_eid = SELF_EID; - db_rep->found_master = TRUE; - - /* - * At the moment, it's useless to pass my address to rep_start here, - * because rep_start ignores it in the case of MASTER. So we could - * avoid the trouble of allocating and freeing this memory. But might - * this conceivably change in the future? - */ - if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0) - return (ret); - ret = __rep_start(dbenv, &my_addr, DB_REP_MASTER); - __os_free(dbenv, my_addr.data); - __repmgr_stash_generation(dbenv); - - return (ret); -} diff --git a/db/repmgr/repmgr_method.c b/db/repmgr/repmgr_method.c deleted file mode 100644 index 723bde7e7..000000000 --- a/db/repmgr/repmgr_method.c +++ /dev/null @@ -1,421 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2005,2007 Oracle. All rights reserved. - * - * $Id: repmgr_method.c,v 1.38 2007/06/11 18:29:34 alanb Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#include "db_int.h" - -static int __repmgr_await_threads __P((DB_ENV *)); - -/* - * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t)); - */ -int -__repmgr_start(dbenv, nthreads, flags) - DB_ENV *dbenv; - int nthreads; - u_int32_t flags; -{ - DB_REP *db_rep; - DBT my_addr; - REPMGR_RUNNABLE *selector, *messenger; - int ret, i; - - db_rep = dbenv->rep_handle; - - /* Check that the required initialization has been done. */ - if (db_rep->my_addr.port == 0) { - __db_errx(dbenv, - "repmgr_set_local_site must be called before repmgr_start"); - return (EINVAL); - } - - if (db_rep->selector != NULL || db_rep->finished) { - __db_errx(dbenv, - "DB_ENV->repmgr_start may not be called more than once"); - return (EINVAL); - } - - switch (flags) { - case DB_REP_CLIENT: - case DB_REP_ELECTION: - case DB_REP_MASTER: - break; - default: - __db_errx(dbenv, - "repmgr_start: unrecognized flags parameter value"); - return (EINVAL); - } - - if (nthreads <= 0) { - __db_errx(dbenv, - "repmgr_start: nthreads parameter must be >= 1"); - return (EINVAL); - } - - if ((ret = - __os_calloc(dbenv, (u_int)nthreads, sizeof(REPMGR_RUNNABLE *), - &db_rep->messengers)) != 0) - return (ret); - db_rep->nthreads = nthreads; - - if ((ret = __repmgr_net_init(dbenv, db_rep)) != 0 || - (ret = __repmgr_init_sync(dbenv, db_rep)) != 0 || - (ret = __rep_set_transport(dbenv, SELF_EID, __repmgr_send)) != 0) - return (ret); - - /* - * Make some sort of call to rep_start before starting other threads, to - * ensure that incoming messages being processed always have a rep - * context properly configured. - */ - if ((db_rep->init_policy = flags) == DB_REP_MASTER) - ret = __repmgr_become_master(dbenv); - else { - if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0) - return (ret); - ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT); - __os_free(dbenv, my_addr.data); - if (ret == 0) { - LOCK_MUTEX(db_rep->mutex); - ret = __repmgr_init_election(dbenv, ELECT_SEEK_MASTER); - UNLOCK_MUTEX(db_rep->mutex); - } - } - if (ret != 0) - return (ret); - - if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_RUNNABLE), &selector)) - != 0) - return (ret); - selector->dbenv = dbenv; - selector->run = __repmgr_select_thread; - if ((ret = __repmgr_thread_start(dbenv, selector)) != 0) { - __db_err(dbenv, ret, "can't start selector thread"); - __os_free(dbenv, selector); - return (ret); - } else - db_rep->selector = selector; - - for (i=0; i<nthreads; i++) { - if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_RUNNABLE), - &messenger)) != 0) - return (ret); - - messenger->dbenv = dbenv; - messenger->run = __repmgr_msg_thread; - if ((ret = __repmgr_thread_start(dbenv, messenger)) != 0) { - __os_free(dbenv, messenger); - return (ret); - } - db_rep->messengers[i] = messenger; - } - - return (ret); -} - -/* - * PUBLIC: int __repmgr_close __P((DB_ENV *)); - */ -int -__repmgr_close(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - int ret, t_ret; - - ret = 0; - db_rep = dbenv->rep_handle; - if (db_rep->selector != NULL) { - RPRINT(dbenv, (dbenv, "Stopping repmgr threads")); - ret = __repmgr_stop_threads(dbenv); - if ((t_ret = __repmgr_await_threads(dbenv)) != 0 && ret == 0) - ret = t_ret; - RPRINT(dbenv, (dbenv, "Repmgr threads are finished")); - } - - if ((t_ret = __repmgr_net_close(dbenv)) != 0 && ret == 0) - ret = t_ret; - - if ((t_ret = __repmgr_close_sync(dbenv)) != 0 && ret == 0) - ret = t_ret; - - return (ret); -} - -/* - * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int)); - */ -int -__repmgr_set_ack_policy(dbenv, policy) - DB_ENV *dbenv; - int policy; -{ - switch (policy) { - case DB_REPMGR_ACKS_ALL: /* FALLTHROUGH */ - case DB_REPMGR_ACKS_ALL_PEERS: /* FALLTHROUGH */ - case DB_REPMGR_ACKS_NONE: /* FALLTHROUGH */ - case DB_REPMGR_ACKS_ONE: /* FALLTHROUGH */ - case DB_REPMGR_ACKS_ONE_PEER: /* FALLTHROUGH */ - case DB_REPMGR_ACKS_QUORUM: - dbenv->rep_handle->perm_policy = policy; - return (0); - default: - __db_errx(dbenv, - "Unknown ack_policy in DB_ENV->repmgr_set_ack_policy"); - return (EINVAL); - } -} - -/* - * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *)); - */ -int -__repmgr_get_ack_policy(dbenv, policy) - DB_ENV *dbenv; - int *policy; -{ - *policy = dbenv->rep_handle->perm_policy; - return (0); -} - -/* - * PUBLIC: int __repmgr_env_create __P((DB_ENV *, DB_REP *)); - */ -int -__repmgr_env_create(dbenv, db_rep) - DB_ENV *dbenv; - DB_REP *db_rep; -{ - int ret; - - /* Set some default values. */ - db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */ - db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */ - db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */ - db_rep->config_nsites = 0; - db_rep->peer = DB_EID_INVALID; - db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; - -#ifdef DB_WIN32 - db_rep->waiters = NULL; -#else - db_rep->read_pipe = db_rep->write_pipe = -1; -#endif - if ((ret = __repmgr_net_create(dbenv, db_rep)) == 0) - ret = __repmgr_queue_create(dbenv, db_rep); - - return (ret); -} - -/* - * PUBLIC: void __repmgr_env_destroy __P((DB_ENV *, DB_REP *)); - */ -void -__repmgr_env_destroy(dbenv, db_rep) - DB_ENV *dbenv; - DB_REP *db_rep; -{ - __repmgr_queue_destroy(dbenv); - __repmgr_net_destroy(dbenv, db_rep); - if (db_rep->messengers != NULL) { - __os_free(dbenv, db_rep->messengers); - db_rep->messengers = NULL; - } -} - -/* - * PUBLIC: int __repmgr_stop_threads __P((DB_ENV *)); - */ -int -__repmgr_stop_threads(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - int ret; - - db_rep = dbenv->rep_handle; - - /* - * Hold mutex for the purpose of waking up threads, but then get out of - * the way to let them clean up and exit. - */ - LOCK_MUTEX(db_rep->mutex); - db_rep->finished = TRUE; - if (db_rep->elect_thread != NULL && - (ret = __repmgr_signal(&db_rep->check_election)) != 0) - goto unlock; - - if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0) - goto unlock; - UNLOCK_MUTEX(db_rep->mutex); - - return (__repmgr_wake_main_thread(dbenv)); - -unlock: - UNLOCK_MUTEX(db_rep->mutex); - return (ret); -} - -static int -__repmgr_await_threads(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - REPMGR_RUNNABLE *messenger; - int ret, t_ret, i; - - db_rep = dbenv->rep_handle; - ret = 0; - if (db_rep->elect_thread != NULL) { - ret = __repmgr_thread_join(db_rep->elect_thread); - __os_free(dbenv, db_rep->elect_thread); - db_rep->elect_thread = NULL; - } - - for (i=0; i<db_rep->nthreads && db_rep->messengers[i] != NULL; i++) { - messenger = db_rep->messengers[i]; - if ((t_ret = __repmgr_thread_join(messenger)) != 0 && ret == 0) - ret = t_ret; - __os_free(dbenv, messenger); - db_rep->messengers[i] = NULL; /* necessary? */ - } - __os_free(dbenv, db_rep->messengers); - db_rep->messengers = NULL; - - if (db_rep->selector != NULL) { - if ((t_ret = __repmgr_thread_join(db_rep->selector)) != 0 && - ret == 0) - ret = t_ret; - __os_free(dbenv, db_rep->selector); - db_rep->selector = NULL; - } - - return (ret); -} - -/* - * PUBLIC: int __repmgr_set_local_site __P((DB_ENV *, const char *, u_int, - * PUBLIC: u_int32_t)); - */ -int -__repmgr_set_local_site(dbenv, host, port, flags) - DB_ENV *dbenv; - const char *host; - u_int port; - u_int32_t flags; -{ - ADDRINFO *address_list; - DB_REP *db_rep; - repmgr_netaddr_t addr; - int locked, ret; - const char *sharable_host; - char buffer[MAXHOSTNAMELEN]; - - if (flags != 0) - return (__db_ferr(dbenv, "DB_ENV->repmgr_set_local_site", 0)); - - db_rep = dbenv->rep_handle; - if (db_rep->my_addr.port != 0) { - __db_errx(dbenv, "Listen address already set"); - return (EINVAL); - } - - /* - * If we haven't been given a sharable local host name, we must get one - * for the purpose of sharing with our friends, but it's ok to use for - * the address look-up. "Helpfully" converting special names such as - * "localhost" in this same way is tempting, but in practice turns out - * to be too tricky. - */ - if (host == NULL) { - if ((ret = gethostname(buffer, sizeof(buffer))) != 0) - return (net_errno); - - /* In case truncation leaves no terminating NUL byte: */ - buffer[sizeof(buffer) - 1] = '\0'; - sharable_host = buffer; - } else - sharable_host = host; - - if ((ret = __repmgr_getaddr(dbenv, - host, port, AI_PASSIVE, &address_list)) != 0) - return (ret); - - if ((ret = __repmgr_pack_netaddr(dbenv, - sharable_host, port, address_list, &addr)) != 0) { - __db_freeaddrinfo(dbenv, address_list); - return (ret); - } - - if (REPMGR_SYNC_INITED(db_rep)) { - LOCK_MUTEX(db_rep->mutex); - locked = TRUE; - } else - locked = FALSE; - - memcpy(&db_rep->my_addr, &addr, sizeof(addr)); - - if (locked) - UNLOCK_MUTEX(db_rep->mutex); - return (0); -} - -/* - * If the application only calls this method from a single thread (e.g., during - * its initialization), it will avoid the problems with the non-thread-safe host - * name lookup. In any case, if we relegate the blocking lookup to here it - * won't affect our select() loop. - * - * PUBLIC: int __repmgr_add_remote_site __P((DB_ENV *, const char *, u_int, - * PUBLIC: int *, u_int32_t)); - */ -int -__repmgr_add_remote_site(dbenv, host, port, eidp, flags) - DB_ENV *dbenv; - const char *host; - u_int port; - int *eidp; - u_int32_t flags; -{ - DB_REP *db_rep; - REPMGR_SITE *site; - int eid, locked, ret; - - if ((ret = __db_fchk(dbenv, - "DB_ENV->repmgr_add_remote_site", flags, DB_REPMGR_PEER)) != 0) - return (ret); - - if (host == NULL) { - __db_errx(dbenv, - "repmgr_add_remote_site: host name is required"); - return (EINVAL); - } - - db_rep = dbenv->rep_handle; - - if (REPMGR_SYNC_INITED(db_rep)) { - LOCK_MUTEX(db_rep->mutex); - locked = TRUE; - } else - locked = FALSE; - - if ((ret = __repmgr_add_site(dbenv, host, port, &site)) != 0) - goto unlock; - eid = EID_FROM_SITE(site); - - if (LF_ISSET(DB_REPMGR_PEER)) - db_rep->peer = eid; - if (eidp != NULL) - *eidp = eid; - -unlock: if (locked) - UNLOCK_MUTEX(db_rep->mutex); - return (ret); -} diff --git a/db/repmgr/repmgr_msg.c b/db/repmgr/repmgr_msg.c deleted file mode 100644 index 6b70bbbe1..000000000 --- a/db/repmgr/repmgr_msg.c +++ /dev/null @@ -1,341 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2005,2007 Oracle. All rights reserved. - * - * $Id: repmgr_msg.c,v 1.36 2007/06/11 18:29:34 alanb Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#include "db_int.h" - -static int message_loop __P((DB_ENV *)); -static int process_message __P((DB_ENV*, DBT*, DBT*, int)); -static int handle_newsite __P((DB_ENV *, const DBT *)); -static int ack_message __P((DB_ENV *, u_int32_t, DB_LSN *)); - -/* - * PUBLIC: void *__repmgr_msg_thread __P((void *)); - */ -void * -__repmgr_msg_thread(args) - void *args; -{ - DB_ENV *dbenv = args; - int ret; - - if ((ret = message_loop(dbenv)) != 0) { - __db_err(dbenv, ret, "message thread failed"); - __repmgr_thread_failure(dbenv, ret); - } - return (NULL); -} - -static int -message_loop(dbenv) - DB_ENV *dbenv; -{ - REPMGR_MESSAGE *msg; - int ret; - - while ((ret = __repmgr_queue_get(dbenv, &msg)) == 0) { - while ((ret = process_message(dbenv, &msg->control, &msg->rec, - msg->originating_eid)) == DB_LOCK_DEADLOCK) - RPRINT(dbenv, (dbenv, "repmgr deadlock retry")); - - __os_free(dbenv, msg); - if (ret != 0) - return (ret); - } - - return (ret == DB_REP_UNAVAIL ? 0 : ret); -} - -static int -process_message(dbenv, control, rec, eid) - DB_ENV *dbenv; - DBT *control, *rec; - int eid; -{ - DB_REP *db_rep; - REP *rep; - DB_LSN permlsn; - int ret; - u_int32_t generation; - - db_rep = dbenv->rep_handle; - - /* - * Save initial generation number, in case it changes in a close race - * with a NEWMASTER. See msgdir.10000/10039/msg00086.html. - */ - generation = db_rep->generation; - - switch (ret = - __rep_process_message(dbenv, control, rec, eid, &permlsn)) { - case 0: - if (db_rep->takeover_pending) { - db_rep->takeover_pending = FALSE; - return (__repmgr_become_master(dbenv)); - } - break; - - case DB_REP_NEWSITE: - return (handle_newsite(dbenv, rec)); - - case DB_REP_HOLDELECTION: - LOCK_MUTEX(db_rep->mutex); - ret = __repmgr_init_election(dbenv, ELECT_ELECTION); - UNLOCK_MUTEX(db_rep->mutex); - if (ret != 0) - return (ret); - break; - - case DB_REP_DUPMASTER: - if ((ret = __repmgr_repstart(dbenv, DB_REP_CLIENT)) != 0) - return (ret); - LOCK_MUTEX(db_rep->mutex); - ret = __repmgr_init_election(dbenv, ELECT_ELECTION); - UNLOCK_MUTEX(db_rep->mutex); - if (ret != 0) - return (ret); - break; - - case DB_REP_ISPERM: - /* - * Don't bother sending ack if master doesn't care about it. - */ - rep = db_rep->region; - if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE || - (IS_PEER_POLICY(db_rep->perm_policy) && - rep->priority == 0)) - break; - - if ((ret = ack_message(dbenv, generation, &permlsn)) != 0) - return (ret); - - break; - - case DB_REP_NOTPERM: /* FALLTHROUGH */ - case DB_REP_IGNORE: /* FALLTHROUGH */ - case DB_LOCK_DEADLOCK: - break; - - default: - __db_err(dbenv, ret, "DB_ENV->rep_process_message"); - return (ret); - } - return (0); -} - -/* - * Handle replication-related events. Returns only 0 or DB_EVENT_NOT_HANDLED; - * no other error returns are tolerated. - * - * PUBLIC: int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); - */ -int -__repmgr_handle_event(dbenv, event, info) - DB_ENV *dbenv; - u_int32_t event; - void *info; -{ - DB_REP *db_rep; - - db_rep = dbenv->rep_handle; - - if (db_rep->selector == NULL) { - /* Repmgr is not in use, so all events go to application. */ - return (DB_EVENT_NOT_HANDLED); - } - - switch (event) { - case DB_EVENT_REP_ELECTED: - DB_ASSERT(dbenv, info == NULL); - - db_rep->found_master = TRUE; - db_rep->takeover_pending = TRUE; - - /* - * The application doesn't really need to see this, because the - * purpose of this event is to tell the winning site that it - * should call rep_start(MASTER), and in repmgr we do that - * automatically. Still, they could conceivably be curious, and - * it doesn't hurt anything to let them know. - */ - break; - case DB_EVENT_REP_NEWMASTER: - DB_ASSERT(dbenv, info != NULL); - - db_rep->found_master = TRUE; - db_rep->master_eid = *(int *)info; - __repmgr_stash_generation(dbenv); - - /* Application still needs to see this. */ - break; - default: - break; - } - return (DB_EVENT_NOT_HANDLED); -} - -/* - * Acknowledges a message. - * - * !!! - * Note that this cannot be called from the select() thread, in case we call - * __repmgr_bust_connection(..., FALSE). - */ -static int -ack_message(dbenv, generation, lsn) - DB_ENV *dbenv; - u_int32_t generation; - DB_LSN *lsn; -{ - DB_REP *db_rep; - REPMGR_SITE *site; - REPMGR_CONNECTION *conn; - DB_REPMGR_ACK ack; - DBT control2, rec2; - int ret; - - db_rep = dbenv->rep_handle; - /* - * Regardless of where a message came from, all ack's go to the master - * site. If we're not in touch with the master, we drop it, since - * there's not much else we can do. - */ - if (!IS_VALID_EID(db_rep->master_eid) || - db_rep->master_eid == SELF_EID) { - RPRINT(dbenv, (dbenv, - "dropping ack with master %d", db_rep->master_eid)); - return (0); - } - - ret = 0; - LOCK_MUTEX(db_rep->mutex); - site = SITE_FROM_EID(db_rep->master_eid); - if (site->state == SITE_CONNECTED && - !F_ISSET(site->ref.conn, CONN_CONNECTING)) { - - ack.generation = generation; - memcpy(&ack.lsn, lsn, sizeof(DB_LSN)); - control2.data = &ack; - control2.size = sizeof(ack); - rec2.size = 0; - - conn = site->ref.conn; - if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, - &control2, &rec2)) == DB_REP_UNAVAIL) - ret = __repmgr_bust_connection(dbenv, conn, FALSE); - } - - UNLOCK_MUTEX(db_rep->mutex); - return (ret); -} - -/* - * Does everything necessary to handle the processing of a NEWSITE return. - */ -static int -handle_newsite(dbenv, rec) - DB_ENV *dbenv; - const DBT *rec; -{ - ADDRINFO *ai; - DB_REP *db_rep; - REPMGR_SITE *site; - SITE_STRING_BUFFER buffer; - repmgr_netaddr_t *addr; - size_t hlen; - u_int16_t port; - int ret; - char *host; - - db_rep = dbenv->rep_handle; - /* - * Check if we got sent connect information and if we did, if - * this is me or if we already have a connection to this new - * site. If we don't, establish a new one. - * - * Unmarshall the cdata: a 2-byte port number, in network byte order, - * followed by the host name string, which should already be - * null-terminated, but let's make sure. - */ - if (rec->size < sizeof(port) + 1) { - __db_errx(dbenv, "unexpected cdata size, msg ignored"); - return (0); - } - memcpy(&port, rec->data, sizeof(port)); - port = ntohs(port); - - host = (char*)((u_int8_t*)rec->data + sizeof(port)); - hlen = (rec->size - sizeof(port)) - 1; - host[hlen] = '\0'; - - /* It's me, do nothing. */ - if (strcmp(host, db_rep->my_addr.host) == 0 && - port == db_rep->my_addr.port) { - RPRINT(dbenv, (dbenv, "repmgr ignores own NEWSITE info")); - return (0); - } - - LOCK_MUTEX(db_rep->mutex); - if ((ret = __repmgr_add_site(dbenv, host, port, &site)) == EEXIST) { - RPRINT(dbenv, (dbenv, - "NEWSITE info from %s was already known", - __repmgr_format_site_loc(site, buffer))); - /* - * In case we already know about this site only because it - * first connected to us, we may not yet have had a chance to - * look up its addresses. Even though we don't need them just - * now, this is an advantageous opportunity to get them since we - * can do so away from the critical select thread. Give up only - * for a disastrous failure. - */ - addr = &site->net_addr; - if (addr->address_list == NULL) { - if ((ret = __repmgr_getaddr(dbenv, - addr->host, addr->port, 0, &ai)) == 0) - addr->address_list = ai; - else if (ret != DB_REP_UNAVAIL) - goto unlock; - } - - ret = 0; - if (site->state == SITE_CONNECTED) - goto unlock; /* Nothing to do. */ - } else { - if (ret != 0) - goto unlock; - RPRINT(dbenv, (dbenv, "NEWSITE info added %s", - __repmgr_format_site_loc(site, buffer))); - } - - /* - * Wake up the main thread to connect to the new or reawakened - * site. - */ - ret = __repmgr_wake_main_thread(dbenv); - -unlock: UNLOCK_MUTEX(db_rep->mutex); - return (ret); -} - -/* - * PUBLIC: void __repmgr_stash_generation __P((DB_ENV *)); - */ -void -__repmgr_stash_generation(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - REP *rep; - - db_rep = dbenv->rep_handle; - rep = db_rep->region; - - db_rep->generation = rep->gen; -} diff --git a/db/repmgr/repmgr_net.c b/db/repmgr/repmgr_net.c deleted file mode 100644 index 95fdeee1f..000000000 --- a/db/repmgr/repmgr_net.c +++ /dev/null @@ -1,1075 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2005,2007 Oracle. All rights reserved. - * - * $Id: repmgr_net.c,v 1.55 2007/06/11 18:29:34 alanb Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#include "db_int.h" -#include "dbinc/mp.h" - -/* - * The functions in this module implement a simple wire protocol for - * transmitting messages, both replication messages and our own internal control - * messages. The protocol is as follows: - * - * 1 byte - message type (defined in repmgr_int.h) - * 4 bytes - size of control - * 4 bytes - size of rec - * ? bytes - control - * ? bytes - rec - * - * where both sizes are 32-bit binary integers in network byte order. - * Either control or rec can have zero length, but even in this case the - * 4-byte length will be present. - * Putting both lengths right up at the front allows us to read in fewer - * phases, and allows us to allocate buffer space for both parts (plus a wrapper - * struct) at once. - */ - -/* - * In sending a message, we first try to send it in-line, in the sending thread, - * and without first copying the message, by using scatter/gather I/O, using - * iovecs to point to the various pieces of the message. If that all works - * without blocking, that's optimal. - * If we find that, for a particular connection, we can't send without - * blocking, then we must copy the message for sending later in the select() - * thread. In the course of doing that, we might as well "flatten" the message, - * forming one single buffer, to simplify life. Not only that, once we've gone - * to the trouble of doing that, other sites to which we also want to send the - * message (in the case of a broadcast), may as well take advantage of the - * simplified structure also. - * This structure holds it all. Note that this structure, and the - * "flat_msg" structure, are allocated separately, because (1) the flat_msg - * version is usually not needed; and (2) when it is needed, it will need to - * live longer than the wrapping sending_msg structure. - * Note that, for the broadcast case, where we're going to use this - * repeatedly, the iovecs is a template that must be copied, since in normal use - * the iovecs pointers and lengths get adjusted after every partial write. - */ -struct sending_msg { - REPMGR_IOVECS iovecs; - u_int8_t type; - u_int32_t control_size_buf, rec_size_buf; - REPMGR_FLAT *fmsg; -}; - -static int __repmgr_send_broadcast - __P((DB_ENV *, const DBT *, const DBT *, u_int *, u_int *)); -static void setup_sending_msg - __P((struct sending_msg *, u_int, const DBT *, const DBT *)); -static int __repmgr_send_internal - __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *)); -static int enqueue_msg - __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); -static int flatten __P((DB_ENV *, struct sending_msg *)); -static REPMGR_SITE *__repmgr_available_site __P((DB_ENV *, int)); - -/* - * __repmgr_send -- - * The send function for DB_ENV->rep_set_transport. - * - * !!! - * This is only ever called as the replication transport call-back, which means - * it's either on one of our message processing threads or an application - * thread. It mustn't be called from the select() thread, because we might call - * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the - * select() thread. - * - * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, - * PUBLIC: const DB_LSN *, int, u_int32_t)); - */ -int -__repmgr_send(dbenv, control, rec, lsnp, eid, flags) - DB_ENV *dbenv; - const DBT *control, *rec; - const DB_LSN *lsnp; - int eid; - u_int32_t flags; -{ - DB_REP *db_rep; - u_int nsites, npeers, available, needed; - int ret, t_ret; - REPMGR_SITE *site; - REPMGR_CONNECTION *conn; - - db_rep = dbenv->rep_handle; - - LOCK_MUTEX(db_rep->mutex); - if (eid == DB_EID_BROADCAST) { - if ((ret = __repmgr_send_broadcast(dbenv, control, rec, - &nsites, &npeers)) != 0) - goto out; - } else { - /* - * If this is a request that can be sent anywhere, then see if - * we can send it to our peer (to save load on the master), but - * not if it's a rerequest, 'cuz that likely means we tried this - * already and failed. - */ - if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) == - DB_REP_ANYWHERE && - IS_VALID_EID(db_rep->peer) && - (site = __repmgr_available_site(dbenv, db_rep->peer)) != - NULL) { - RPRINT(dbenv, (dbenv, "sending request to peer")); - } else if ((site = __repmgr_available_site(dbenv, eid)) == - NULL) { - RPRINT(dbenv, (dbenv, - "ignoring message sent to unavailable site")); - ret = DB_REP_UNAVAIL; - goto out; - } - - conn = site->ref.conn; - if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, - control, rec)) == DB_REP_UNAVAIL && - (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0) - ret = t_ret; - if (ret != 0) - goto out; - - nsites = 1; - npeers = site->priority > 0 ? 1 : 0; - } - /* - * Right now, nsites and npeers represent the (maximum) number of sites - * we've attempted to begin sending the message to. Of course we - * haven't really received any ack's yet. But since we've only sent to - * nsites/npeers other sites, that's the maximum number of ack's we - * could possibly expect. If even that number fails to satisfy our PERM - * policy, there's no point waiting for something that will never - * happen. - */ - if (LF_ISSET(DB_REP_PERMANENT)) { - switch (db_rep->perm_policy) { - case DB_REPMGR_ACKS_NONE: - goto out; - - case DB_REPMGR_ACKS_ONE: - needed = 1; - available = nsites; - break; - - case DB_REPMGR_ACKS_ALL: - /* Number of sites in the group besides myself. */ - needed = __repmgr_get_nsites(db_rep) - 1; - available = nsites; - break; - - case DB_REPMGR_ACKS_ONE_PEER: - needed = 1; - available = npeers; - break; - - case DB_REPMGR_ACKS_ALL_PEERS: - /* - * Too hard to figure out "needed", since we're not - * keeping track of how many peers we have; so just skip - * the optimization in this case. - */ - needed = 1; - available = npeers; - break; - - case DB_REPMGR_ACKS_QUORUM: - /* - * The minimum number of acks necessary to ensure that - * the transaction is durable if an election is held. - */ - needed = (__repmgr_get_nsites(db_rep) - 1) / 2; - available = npeers; - break; - - default: - COMPQUIET(available, 0); - COMPQUIET(needed, 0); - (void)__db_unknown_path(dbenv, "__repmgr_send"); - break; - } - if (available < needed) { - ret = DB_REP_UNAVAIL; - goto out; - } - /* In ALL_PEERS case, display of "needed" might be confusing. */ - RPRINT(dbenv, (dbenv, - "will await acknowledgement: need %u", needed)); - ret = __repmgr_await_ack(dbenv, lsnp); - } - -out: UNLOCK_MUTEX(db_rep->mutex); - if (ret != 0 && LF_ISSET(DB_REP_PERMANENT)) { - STAT(db_rep->region->mstat.st_perm_failed++); - DB_EVENT(dbenv, DB_EVENT_REP_PERM_FAILED, NULL); - } - return (ret); -} - -static REPMGR_SITE * -__repmgr_available_site(dbenv, eid) - DB_ENV *dbenv; - int eid; -{ - DB_REP *db_rep; - REPMGR_SITE *site; - - db_rep = dbenv->rep_handle; - site = SITE_FROM_EID(eid); - if (site->state != SITE_CONNECTED) - return (NULL); - - if (F_ISSET(site->ref.conn, CONN_CONNECTING)) - return (NULL); - return (site); -} - -/* - * Sends message to all sites with which we currently have an active - * connection. Sets result parameters according to how many sites we attempted - * to begin sending to, even if we did nothing more than queue it for later - * delivery. - * - * !!! - * Caller must hold dbenv->mutex. - * - * !!! - * Note that this cannot be called from the select() thread, in case we call - * __repmgr_bust_connection(..., FALSE). - */ -static int -__repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp) - DB_ENV *dbenv; - const DBT *control, *rec; - u_int *nsitesp, *npeersp; -{ - DB_REP *db_rep; - struct sending_msg msg; - REPMGR_CONNECTION *conn; - REPMGR_SITE *site; - u_int nsites, npeers; - int ret; - - db_rep = dbenv->rep_handle; - - setup_sending_msg(&msg, REPMGR_REP_MESSAGE, control, rec); - nsites = npeers = 0; - - /* - * Traverse the connections list. Here, even in bust_connection, we - * don't unlink the current list entry, so we can use the TAILQ_FOREACH - * macro. - */ - TAILQ_FOREACH(conn, &db_rep->connections, entries) { - if (F_ISSET(conn, CONN_CONNECTING | CONN_DEFUNCT) || - !IS_VALID_EID(conn->eid)) - continue; - - if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) { - site = SITE_FROM_EID(conn->eid); - nsites++; - if (site->priority > 0) - npeers++; - } else if (ret == DB_REP_UNAVAIL) { - if ((ret = __repmgr_bust_connection( - dbenv, conn, FALSE)) != 0) - return (ret); - } else - return (ret); - } - - *nsitesp = nsites; - *npeersp = npeers; - return (0); -} - -/* - * __repmgr_send_one -- - * Send a message to a site, or if you can't just yet, make a copy of it - * and arrange to have it sent later. 'rec' may be NULL, in which case we send - * a zero length and no data. - * - * If we get an error, we take care of cleaning up the connection (calling - * __repmgr_bust_connection()), so that the caller needn't do so. - * - * !!! - * Note that the mutex should be held through this call. - * It doubles as a synchronizer to make sure that two threads don't - * intersperse writes that are part of two single messages. - * - * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, - * PUBLIC: u_int, const DBT *, const DBT *)); - */ -int -__repmgr_send_one(dbenv, conn, msg_type, control, rec) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - u_int msg_type; - const DBT *control, *rec; -{ - struct sending_msg msg; - - setup_sending_msg(&msg, msg_type, control, rec); - return (__repmgr_send_internal(dbenv, conn, &msg)); -} - -/* - * Attempts a "best effort" to send a message on the given site. If there is an - * excessive backlog of message already queued on the connection, we simply drop - * this message, and still return 0 even in this case. - */ -static int -__repmgr_send_internal(dbenv, conn, msg) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - struct sending_msg *msg; -{ -#define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */ - REPMGR_IOVECS iovecs; - SITE_STRING_BUFFER buffer; - int ret; - size_t nw; - size_t total_written; - - DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); - if (!STAILQ_EMPTY(&conn->outbound_queue)) { - /* - * Output to this site is currently owned by the select() - * thread, so we can't try sending in-line here. We can only - * queue the msg for later. - */ - RPRINT(dbenv, (dbenv, "msg to %s to be queued", - __repmgr_format_eid_loc(dbenv->rep_handle, - conn->eid, buffer))); - if (conn->out_queue_length < OUT_QUEUE_LIMIT) - return (enqueue_msg(dbenv, conn, msg, 0)); - else { - RPRINT(dbenv, (dbenv, "queue limit exceeded")); - STAT(dbenv->rep_handle-> - region->mstat.st_msgs_dropped++); - return (0); - } - } - - /* - * Send as much data to the site as we can, without blocking. Keep - * writing as long as we're making some progress. Make a scratch copy - * of iovecs for our use, since we destroy it in the process of - * adjusting pointers after each partial I/O. - */ - memcpy(&iovecs, &msg->iovecs, sizeof(iovecs)); - total_written = 0; - while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset], - iovecs.count-iovecs.offset, &nw)) == 0) { - total_written += nw; - if (__repmgr_update_consumed(&iovecs, nw)) /* all written */ - return (0); - } - - if (ret != WOULDBLOCK) { - __db_err(dbenv, ret, "socket writing failure"); - return (DB_REP_UNAVAIL); - } - - RPRINT(dbenv, (dbenv, "wrote only %lu bytes to %s", - (u_long)total_written, - __repmgr_format_eid_loc(dbenv->rep_handle, conn->eid, buffer))); - /* - * We can't send any more without blocking: queue (a pointer to) a - * "flattened" copy of the message, so that the select() thread will - * finish sending it later. - */ - if ((ret = enqueue_msg(dbenv, conn, msg, total_written)) != 0) - return (ret); - - STAT(dbenv->rep_handle->region->mstat.st_msgs_queued++); - - /* - * Wake the main select thread so that it can discover that it has - * received ownership of this connection. Note that we didn't have to - * do this in the previous case (above), because the non-empty queue - * implies that the select() thread is already managing ownership of - * this connection. - */ -#ifdef DB_WIN32 - if (WSAEventSelect(conn->fd, conn->event_object, - FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) { - ret = net_errno; - __db_err(dbenv, ret, "can't add FD_WRITE event bit"); - return (ret); - } -#endif - return (__repmgr_wake_main_thread(dbenv)); -} - -/* - * PUBLIC: int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); - * - * Count up how many sites have ack'ed the given LSN. Returns TRUE if enough - * sites have ack'ed; FALSE otherwise. - * - * !!! - * Caller must hold the mutex. - */ -int -__repmgr_is_permanent(dbenv, lsnp) - DB_ENV *dbenv; - const DB_LSN *lsnp; -{ - DB_REP *db_rep; - REPMGR_SITE *site; - u_int eid, nsites, npeers; - int is_perm, has_missing_peer; - - db_rep = dbenv->rep_handle; - - if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE) - return (TRUE); - - nsites = npeers = 0; - has_missing_peer = FALSE; - for (eid = 0; eid < db_rep->site_cnt; eid++) { - site = SITE_FROM_EID(eid); - if (site->priority == -1) { - /* - * Never connected to this site: since we can't know - * whether it's a peer, assume the worst. - */ - has_missing_peer = TRUE; - continue; - } - - if (log_compare(&site->max_ack, lsnp) >= 0) { - nsites++; - if (site->priority > 0) - npeers++; - } else { - /* This site hasn't ack'ed the message. */ - if (site->priority > 0) - has_missing_peer = TRUE; - } - } - - switch (db_rep->perm_policy) { - case DB_REPMGR_ACKS_ONE: - is_perm = (nsites >= 1); - break; - case DB_REPMGR_ACKS_ONE_PEER: - is_perm = (npeers >= 1); - break; - case DB_REPMGR_ACKS_QUORUM: - /* - * The minimum number of acks necessary to ensure that the - * transaction is durable if an election is held (given that we - * always conduct elections according to the standard, - * recommended practice of requiring votes from a majority of - * sites). - */ - if (__repmgr_get_nsites(db_rep) == 2) { - /* - * A group of 2 sites is, as always, a special case. - * For a transaction to be durable the other site has to - * have received it. - */ - is_perm = (npeers >= 1); - } else - is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2); - break; - case DB_REPMGR_ACKS_ALL: - /* Adjust by 1, since get_nsites includes local site. */ - is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1); - break; - case DB_REPMGR_ACKS_ALL_PEERS: - if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) { - /* Assume missing site might be a peer. */ - has_missing_peer = TRUE; - } - is_perm = !has_missing_peer; - break; - default: - is_perm = FALSE; - (void)__db_unknown_path(dbenv, "__repmgr_is_permanent"); - } - return (is_perm); -} - -/* - * Abandons a connection, to recover from an error. Upon entry the conn struct - * must be on the connections list. - * - * If the 'do_close' flag is true, we do the whole job; the clean-up includes - * removing the struct from the list and freeing all its memory, so upon return - * the caller must not refer to it any further. Otherwise, we merely mark the - * connection for clean-up later by the main thread. - * - * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, - * PUBLIC: REPMGR_CONNECTION *, int)); - * - * !!! - * Caller holds mutex. - */ -int -__repmgr_bust_connection(dbenv, conn, do_close) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - int do_close; -{ - DB_REP *db_rep; - int connecting, ret, eid; - - db_rep = dbenv->rep_handle; - ret = 0; - - DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); - eid = conn->eid; - connecting = F_ISSET(conn, CONN_CONNECTING); - if (do_close) - __repmgr_cleanup_connection(dbenv, conn); - else { - F_SET(conn, CONN_DEFUNCT); - conn->eid = -1; - } - - /* - * When we first accepted the incoming connection, we set conn->eid to - * -1 to indicate that we didn't yet know what site it might be from. - * If we then get here because we later decide it was a redundant - * connection, the following scary stuff will correctly not happen. - */ - if (IS_VALID_EID(eid)) { - /* schedule_connection_attempt wakes the main thread. */ - if ((ret = __repmgr_schedule_connection_attempt( - dbenv, (u_int)eid, FALSE)) != 0) - return (ret); - - /* - * If this connection had gotten no further than the CONNECTING - * state, this can't count as a loss of connection to the - * master. - */ - if (!connecting && eid == db_rep->master_eid) { - (void)__memp_set_config( - dbenv, DB_MEMP_SYNC_INTERRUPT, 1); - if ((ret = __repmgr_init_election( - dbenv, ELECT_FAILURE_ELECTION)) != 0) - return (ret); - } - } else if (!do_close) { - /* - * One way or another, make sure the main thread is poked, so - * that we do the deferred clean-up. - */ - ret = __repmgr_wake_main_thread(dbenv); - } - return (ret); -} - -/* - * PUBLIC: void __repmgr_cleanup_connection - * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); - */ -void -__repmgr_cleanup_connection(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -{ - DB_REP *db_rep; - QUEUED_OUTPUT *out; - REPMGR_FLAT *msg; - DBT *dbt; - - db_rep = dbenv->rep_handle; - - TAILQ_REMOVE(&db_rep->connections, conn, entries); - if (conn->fd != INVALID_SOCKET) { - (void)closesocket(conn->fd); -#ifdef DB_WIN32 - (void)WSACloseEvent(conn->event_object); -#endif - } - - /* - * Deallocate any input and output buffers we may have. - */ - if (conn->reading_phase == DATA_PHASE) { - if (conn->msg_type == REPMGR_REP_MESSAGE) - __os_free(dbenv, conn->input.rep_message); - else { - dbt = &conn->input.repmgr_msg.cntrl; - __os_free(dbenv, dbt->data); - dbt = &conn->input.repmgr_msg.rec; - if (dbt->size > 0) - __os_free(dbenv, dbt->data); - } - } - while (!STAILQ_EMPTY(&conn->outbound_queue)) { - out = STAILQ_FIRST(&conn->outbound_queue); - STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries); - msg = out->msg; - if (--msg->ref_count <= 0) - __os_free(dbenv, msg); - __os_free(dbenv, out); - } - - __os_free(dbenv, conn); -} - -static int -enqueue_msg(dbenv, conn, msg, offset) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - struct sending_msg *msg; - size_t offset; -{ - QUEUED_OUTPUT *q_element; - int ret; - - if (msg->fmsg == NULL && ((ret = flatten(dbenv, msg)) != 0)) - return (ret); - if ((ret = __os_malloc(dbenv, sizeof(QUEUED_OUTPUT), &q_element)) != 0) - return (ret); - q_element->msg = msg->fmsg; - msg->fmsg->ref_count++; /* encapsulation would be sweeter */ - q_element->offset = offset; - - /* Put it on the connection's outbound queue. */ - STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries); - conn->out_queue_length++; - return (0); -} - -/* - * The 'rec' DBT can be NULL, in which case we treat it like a zero-length DBT. - * But 'control' is always present. - */ -static void -setup_sending_msg(msg, type, control, rec) - struct sending_msg *msg; - u_int type; - const DBT *control, *rec; -{ - u_int32_t rec_size; - - /* - * The wire protocol is documented in a comment at the top of this - * module. - */ - __repmgr_iovec_init(&msg->iovecs); - msg->type = type; - __repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type)); - - msg->control_size_buf = htonl(control->size); - __repmgr_add_buffer(&msg->iovecs, - &msg->control_size_buf, sizeof(msg->control_size_buf)); - - rec_size = rec == NULL ? 0 : rec->size; - msg->rec_size_buf = htonl(rec_size); - __repmgr_add_buffer( - &msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf)); - - if (control->size > 0) - __repmgr_add_dbt(&msg->iovecs, control); - - if (rec_size > 0) - __repmgr_add_dbt(&msg->iovecs, rec); - - msg->fmsg = NULL; -} - -/* - * Convert a message stored as iovec pointers to various pieces, into flattened - * form, by copying all the pieces, and then make the iovec just point to the - * new simplified form. - */ -static int -flatten(dbenv, msg) - DB_ENV *dbenv; - struct sending_msg *msg; -{ - u_int8_t *p; - size_t msg_size; - int i, ret; - - DB_ASSERT(dbenv, msg->fmsg == NULL); - - msg_size = msg->iovecs.total_bytes; - if ((ret = __os_malloc(dbenv, sizeof(*msg->fmsg) + msg_size, - &msg->fmsg)) != 0) - return (ret); - msg->fmsg->length = msg_size; - msg->fmsg->ref_count = 0; - p = &msg->fmsg->data[0]; - - for (i = 0; i < msg->iovecs.count; i++) { - memcpy(p, msg->iovecs.vectors[i].iov_base, - msg->iovecs.vectors[i].iov_len); - p = &p[msg->iovecs.vectors[i].iov_len]; - } - __repmgr_iovec_init(&msg->iovecs); - __repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size); - return (0); -} - -/* - * PUBLIC: int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); - */ -int -__repmgr_find_site(dbenv, host, port) - DB_ENV *dbenv; - const char *host; - u_int port; -{ - DB_REP *db_rep; - REPMGR_SITE *site; - u_int i; - - db_rep = dbenv->rep_handle; - for (i = 0; i < db_rep->site_cnt; i++) { - site = &db_rep->sites[i]; - - if (strcmp(site->net_addr.host, host) == 0 && - site->net_addr.port == port) - return ((int)i); - } - - return (-1); -} - -/* - * Stash a copy of the given host name and port number into a convenient data - * structure so that we can save it permanently. This is kind of like a - * constructor for a netaddr object, except that the caller supplies the memory - * for the base struct (though not the subordinate attachments). - * - * All inputs are assumed to have been already validated. - * - * PUBLIC: int __repmgr_pack_netaddr __P((DB_ENV *, const char *, - * PUBLIC: u_int, ADDRINFO *, repmgr_netaddr_t *)); - */ -int -__repmgr_pack_netaddr(dbenv, host, port, list, addr) - DB_ENV *dbenv; - const char *host; - u_int port; - ADDRINFO *list; - repmgr_netaddr_t *addr; -{ - int ret; - - DB_ASSERT(dbenv, host != NULL); - - if ((ret = __os_strdup(dbenv, host, &addr->host)) != 0) - return (ret); - addr->port = (u_int16_t)port; - addr->address_list = list; - addr->current = NULL; - return (0); -} - -/* - * PUBLIC: int __repmgr_getaddr __P((DB_ENV *, - * PUBLIC: const char *, u_int, int, ADDRINFO **)); - */ -int -__repmgr_getaddr(dbenv, host, port, flags, result) - DB_ENV *dbenv; - const char *host; - u_int port; - int flags; /* Matches struct addrinfo declaration. */ - ADDRINFO **result; -{ - ADDRINFO *answer, hints; - char buffer[10]; /* 2**16 fits in 5 digits. */ -#ifdef DB_WIN32 - int ret; -#endif - - /* - * Ports are really 16-bit unsigned values, but it's too painful to - * push that type through the API. - */ - if (port > UINT16_MAX) { - __db_errx(dbenv, "port %u larger than max port %u", - port, UINT16_MAX); - return (EINVAL); - } - -#ifdef DB_WIN32 - if (!dbenv->rep_handle->wsa_inited && - (ret = __repmgr_wsa_init(dbenv)) != 0) - return (ret); -#endif - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = flags; - (void)snprintf(buffer, sizeof(buffer), "%u", port); - - /* - * Although it's generally bad to discard error information, the return - * code from __db_getaddrinfo is undependable. Our callers at least - * would like to be able to distinguish errors in getaddrinfo (which we - * want to consider to be re-tryable), from other failure (e.g., EINVAL, - * above). - */ - if (__db_getaddrinfo(dbenv, host, port, buffer, &hints, &answer) != 0) - return (DB_REP_UNAVAIL); - *result = answer; - - return (0); -} - -/* - * Adds a new site to our array of known sites (unless it already exists), - * and schedules it for immediate connection attempt. Whether it exists or not, - * we set newsitep, either to the already existing site, or to the newly created - * site. Unless newsitep is passed in as NULL, which is allowed. - * - * PUBLIC: int __repmgr_add_site - * PUBLIC: __P((DB_ENV *, const char *, u_int, REPMGR_SITE **)); - * - * !!! - * Caller is expected to hold the mutex. - */ -int -__repmgr_add_site(dbenv, host, port, newsitep) - DB_ENV *dbenv; - const char *host; - u_int port; - REPMGR_SITE **newsitep; -{ - DB_REP *db_rep; - ADDRINFO *address_list; - repmgr_netaddr_t addr; - REPMGR_SITE *site; - int ret, eid; - - ret = 0; - db_rep = dbenv->rep_handle; - - if (IS_VALID_EID(eid = __repmgr_find_site(dbenv, host, port))) { - site = SITE_FROM_EID(eid); - ret = EEXIST; - goto out; - } - - if ((ret = __repmgr_getaddr( - dbenv, host, port, 0, &address_list)) == DB_REP_UNAVAIL) { - /* Allow re-tryable errors. We'll try again later. */ - address_list = NULL; - } else if (ret != 0) - return (ret); - - if ((ret = __repmgr_pack_netaddr( - dbenv, host, port, address_list, &addr)) != 0) { - __db_freeaddrinfo(dbenv, address_list); - return (ret); - } - - if ((ret = __repmgr_new_site(dbenv, &site, &addr, SITE_IDLE)) != 0) { - __repmgr_cleanup_netaddr(dbenv, &addr); - return (ret); - } - - if (db_rep->selector != NULL && - (ret = __repmgr_schedule_connection_attempt( - dbenv, (u_int)EID_FROM_SITE(site), TRUE)) != 0) - return (ret); - - /* Note that we should only come here for success and EEXIST. */ -out: - if (newsitep != NULL) - *newsitep = site; - return (ret); -} - -/* - * Initializes net-related memory in the db_rep handle. - * - * PUBLIC: int __repmgr_net_create __P((DB_ENV *, DB_REP *)); - */ -int -__repmgr_net_create(dbenv, db_rep) - DB_ENV *dbenv; - DB_REP *db_rep; -{ - COMPQUIET(dbenv, NULL); - - db_rep->listen_fd = INVALID_SOCKET; - db_rep->master_eid = DB_EID_INVALID; - - TAILQ_INIT(&db_rep->connections); - TAILQ_INIT(&db_rep->retries); - - return (0); -} - -/* - * listen_socket_init -- - * Initialize a socket for listening. Sets - * a file descriptor for the socket, ready for an accept() call - * in a thread that we're happy to let block. - * - * PUBLIC: int __repmgr_listen __P((DB_ENV *)); - */ -int -__repmgr_listen(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - ADDRINFO *ai; - char *why; - int sockopt, ret; - socket_t s; - - db_rep = dbenv->rep_handle; - - /* Use OOB value as sentinel to show no socket open. */ - s = INVALID_SOCKET; - ai = ADDR_LIST_FIRST(&db_rep->my_addr); - - /* - * Given the assert is correct, we execute the loop at least once, which - * means 'why' will have been set by the time it's needed. But I guess - * lint doesn't know about DB_ASSERT. - */ - COMPQUIET(why, ""); - DB_ASSERT(dbenv, ai != NULL); - for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) { - - if ((s = socket(ai->ai_family, - ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) { - why = "can't create listen socket"; - continue; - } - - /* - * When testing, it's common to kill and restart regularly. On - * some systems, this causes bind to fail with "address in use" - * errors unless this option is set. - */ - sockopt = 1; - if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt, - sizeof(sockopt)) != 0) { - why = "can't set REUSEADDR socket option"; - break; - } - - if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) { - why = "can't bind socket to listening address"; - (void)closesocket(s); - s = INVALID_SOCKET; - continue; - } - - if (listen(s, 5) != 0) { - why = "listen()"; - break; - } - - if ((ret = __repmgr_set_nonblocking(s)) != 0) { - __db_err(dbenv, ret, "can't unblock listen socket"); - goto clean; - } - - db_rep->listen_fd = s; - return (0); - } - - ret = net_errno; - __db_err(dbenv, ret, why); -clean: if (s != INVALID_SOCKET) - (void)closesocket(s); - return (ret); -} - -/* - * PUBLIC: int __repmgr_net_close __P((DB_ENV *)); - */ -int -__repmgr_net_close(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - REPMGR_CONNECTION *conn; -#ifndef DB_WIN32 - struct sigaction sigact; -#endif - int ret; - - db_rep = dbenv->rep_handle; - if (db_rep->listen_fd == INVALID_SOCKET) - return (0); - - TAILQ_FOREACH(conn, &db_rep->connections, entries) { - if (conn->fd != INVALID_SOCKET) { - (void)closesocket(conn->fd); - conn->fd = INVALID_SOCKET; -#ifdef DB_WIN32 - (void)WSACloseEvent(conn->event_object); -#endif - } - } - - ret = 0; - if (closesocket(db_rep->listen_fd) == SOCKET_ERROR) - ret = net_errno; - -#ifdef DB_WIN32 - /* Shut down the Windows sockets DLL. */ - if (WSACleanup() == SOCKET_ERROR && ret == 0) - ret = net_errno; - db_rep->wsa_inited = FALSE; -#else - /* Restore original SIGPIPE handling configuration. */ - if (db_rep->chg_sig_handler) { - memset(&sigact, 0, sizeof(sigact)); - sigact.sa_handler = SIG_DFL; - if (sigaction(SIGPIPE, &sigact, NULL) == -1 && ret == 0) - ret = errno; - } -#endif - db_rep->listen_fd = INVALID_SOCKET; - return (ret); -} - -/* - * PUBLIC: void __repmgr_net_destroy __P((DB_ENV *, DB_REP *)); - */ -void -__repmgr_net_destroy(dbenv, db_rep) - DB_ENV *dbenv; - DB_REP *db_rep; -{ - REPMGR_CONNECTION *conn; - REPMGR_RETRY *retry; - REPMGR_SITE *site; - u_int i; - - __repmgr_cleanup_netaddr(dbenv, &db_rep->my_addr); - - if (db_rep->sites == NULL) - return; - - while (!TAILQ_EMPTY(&db_rep->retries)) { - retry = TAILQ_FIRST(&db_rep->retries); - TAILQ_REMOVE(&db_rep->retries, retry, entries); - __os_free(dbenv, retry); - } - - while (!TAILQ_EMPTY(&db_rep->connections)) { - conn = TAILQ_FIRST(&db_rep->connections); - __repmgr_cleanup_connection(dbenv, conn); - } - - for (i = 0; i < db_rep->site_cnt; i++) { - site = &db_rep->sites[i]; - __repmgr_cleanup_netaddr(dbenv, &site->net_addr); - } - __os_free(dbenv, db_rep->sites); - db_rep->sites = NULL; -} diff --git a/db/repmgr/repmgr_posix.c b/db/repmgr/repmgr_posix.c deleted file mode 100644 index fde88262d..000000000 --- a/db/repmgr/repmgr_posix.c +++ /dev/null @@ -1,691 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2005,2007 Oracle. All rights reserved. - * - * $Id: repmgr_posix.c,v 1.29 2007/06/11 18:29:34 alanb Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#define __INCLUDE_SELECT_H 1 -#include "db_int.h" - -/* - * A very rough guess at the maximum stack space one of our threads could ever - * need, which we hope is plenty conservative. This can be patched in the field - * if necessary. - */ -#ifdef _POSIX_THREAD_ATTR_STACKSIZE -size_t __repmgr_guesstimated_max = (128 * 1024); -#endif - -static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *)); - -/* - * Starts the thread described in the argument, and stores the resulting thread - * ID therein. - * - * PUBLIC: int __repmgr_thread_start __P((DB_ENV *, REPMGR_RUNNABLE *)); - */ -int -__repmgr_thread_start(dbenv, runnable) - DB_ENV *dbenv; - REPMGR_RUNNABLE *runnable; -{ - pthread_attr_t *attrp; -#ifdef _POSIX_THREAD_ATTR_STACKSIZE - pthread_attr_t attributes; - size_t size; - int ret; -#endif - - runnable->finished = FALSE; - -#ifdef _POSIX_THREAD_ATTR_STACKSIZE - attrp = &attributes; - if ((ret = pthread_attr_init(&attributes)) != 0) { - __db_err(dbenv, - ret, "pthread_attr_init in repmgr_thread_start"); - return (ret); - } - - /* - * On a 64-bit machine it seems reasonable that we could need twice as - * much stack space as we did on a 32-bit machine. - */ - size = __repmgr_guesstimated_max; - if (sizeof(size_t) > 4) - size *= 2; -#ifdef PTHREAD_STACK_MIN - if (size < PTHREAD_STACK_MIN) - size = PTHREAD_STACK_MIN; -#endif - if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) { - __db_err(dbenv, - ret, "pthread_attr_setstacksize in repmgr_thread_start"); - return (ret); - } -#else - attrp = NULL; -#endif - - return (pthread_create(&runnable->thread_id, attrp, - runnable->run, dbenv)); -} - -/* - * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *)); - */ -int -__repmgr_thread_join(thread) - REPMGR_RUNNABLE *thread; -{ - return (pthread_join(thread->thread_id, NULL)); -} - -/* - * PUBLIC: int __repmgr_set_nonblocking __P((socket_t)); - */ -int -__repmgr_set_nonblocking(fd) - socket_t fd; -{ - int flags; - - if ((flags = fcntl(fd, F_GETFL, 0)) < 0) - return (errno); - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) - return (errno); - return (0); -} - -/* - * PUBLIC: int __repmgr_wake_waiting_senders __P((DB_ENV *)); - * - * Wake any send()-ing threads waiting for an acknowledgement. - * - * !!! - * Caller must hold the db_rep->mutex, if this thread synchronization is to work - * properly. - */ -int -__repmgr_wake_waiting_senders(dbenv) - DB_ENV *dbenv; -{ - return (pthread_cond_broadcast(&dbenv->rep_handle->ack_condition)); -} - -/* - * PUBLIC: int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *)); - * - * Waits (a limited time) for configured number of remote sites to ack the given - * LSN. - * - * !!! - * Caller must hold repmgr->mutex. - */ -int -__repmgr_await_ack(dbenv, lsnp) - DB_ENV *dbenv; - const DB_LSN *lsnp; -{ - DB_REP *db_rep; - struct timespec deadline; - int ret, timed; - - db_rep = dbenv->rep_handle; - - if ((timed = (db_rep->ack_timeout > 0))) - __repmgr_compute_wait_deadline(dbenv, &deadline, - db_rep->ack_timeout); - else - COMPQUIET(deadline.tv_sec, 0); - - while (!__repmgr_is_permanent(dbenv, lsnp)) { - if (timed) - ret = pthread_cond_timedwait(&db_rep->ack_condition, - &db_rep->mutex, &deadline); - else - ret = pthread_cond_wait(&db_rep->ack_condition, - &db_rep->mutex); - if (db_rep->finished) - return (DB_REP_UNAVAIL); - if (ret != 0) - return (ret); - } - return (0); -} - -/* - * Computes a deadline time a certain distance into the future, in a form - * suitable for the pthreads timed wait operation. Curiously, that call uses - * nano-second resolution; elsewhere we use microseconds. - * - * PUBLIC: void __repmgr_compute_wait_deadline __P((DB_ENV*, - * PUBLIC: struct timespec *, db_timeout_t)); - */ -void -__repmgr_compute_wait_deadline(dbenv, result, wait) - DB_ENV *dbenv; - struct timespec *result; - db_timeout_t wait; -{ - db_timespec v; - - /* - * Start with "now"; then add the "wait" offset. - * - * A db_timespec is the same as a "struct timespec" so we can pass - * result directly to the underlying Berkeley DB OS routine. - */ - __os_gettime(dbenv, (db_timespec *)result); - - /* Convert microsecond wait to a timespec. */ - DB_TIMEOUT_TO_TIMESPEC(wait, &v); - - timespecadd(result, &v); -} - -/* - * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); - * - * Allocate/initialize all data necessary for thread synchronization. This - * should be an all-or-nothing affair. Other than here and in _close_sync there - * should never be a time when these resources aren't either all allocated or - * all freed. If that's true, then we can safely use the values of the file - * descriptor(s) to keep track of which it is. - */ -int -__repmgr_init_sync(dbenv, db_rep) - DB_ENV *dbenv; - DB_REP *db_rep; -{ - int ret, mutex_inited, ack_inited, elect_inited, queue_inited, - file_desc[2]; - - COMPQUIET(dbenv, NULL); - - mutex_inited = ack_inited = elect_inited = queue_inited = FALSE; - - if ((ret = pthread_mutex_init(&db_rep->mutex, NULL)) != 0) - goto err; - mutex_inited = TRUE; - - if ((ret = pthread_cond_init(&db_rep->ack_condition, NULL)) != 0) - goto err; - ack_inited = TRUE; - - if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0) - goto err; - elect_inited = TRUE; - - if ((ret = pthread_cond_init(&db_rep->queue_nonempty, NULL)) != 0) - goto err; - queue_inited = TRUE; - - if ((ret = pipe(file_desc)) == -1) { - ret = errno; - goto err; - } - - db_rep->read_pipe = file_desc[0]; - db_rep->write_pipe = file_desc[1]; - return (0); -err: - if (queue_inited) - (void)pthread_cond_destroy(&db_rep->queue_nonempty); - if (elect_inited) - (void)pthread_cond_destroy(&db_rep->check_election); - if (ack_inited) - (void)pthread_cond_destroy(&db_rep->ack_condition); - if (mutex_inited) - (void)pthread_mutex_destroy(&db_rep->mutex); - db_rep->read_pipe = db_rep->write_pipe = -1; - - return (ret); -} - -/* - * PUBLIC: int __repmgr_close_sync __P((DB_ENV *)); - * - * Frees the thread synchronization data within a repmgr struct, in a - * platform-specific way. - */ -int -__repmgr_close_sync(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - int ret, t_ret; - - db_rep = dbenv->rep_handle; - - if (!(REPMGR_SYNC_INITED(db_rep))) - return (0); - - ret = pthread_cond_destroy(&db_rep->queue_nonempty); - - if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 && - ret == 0) - ret = t_ret; - - if ((t_ret = pthread_cond_destroy(&db_rep->ack_condition)) != 0 && - ret == 0) - ret = t_ret; - - if ((t_ret = pthread_mutex_destroy(&db_rep->mutex)) != 0 && - ret == 0) - ret = t_ret; - - if (close(db_rep->read_pipe) == -1 && ret == 0) - ret = errno; - if (close(db_rep->write_pipe) == -1 && ret == 0) - ret = errno; - - db_rep->read_pipe = db_rep->write_pipe = -1; - return (ret); -} - -/* - * Performs net-related resource initialization other than memory initialization - * and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing" - * sentinel signifying that these resources are allocated. - * - * PUBLIC: int __repmgr_net_init __P((DB_ENV *, DB_REP *)); - */ -int -__repmgr_net_init(dbenv, db_rep) - DB_ENV *dbenv; - DB_REP *db_rep; -{ - int ret; - struct sigaction sigact; - - if ((ret = __repmgr_listen(dbenv)) != 0) - return (ret); - - /* - * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed - * just for trying to write onto a socket that had been reset. - */ - if (sigaction(SIGPIPE, NULL, &sigact) == -1) { - ret = errno; - __db_err(dbenv, ret, "can't access signal handler"); - goto err; - } - /* - * If we need to change the sig handler, do so, and also set a flag so - * that we remember we did. - */ - if ((db_rep->chg_sig_handler = (sigact.sa_handler == SIG_DFL))) { - sigact.sa_handler = SIG_IGN; - sigact.sa_flags = 0; - if (sigaction(SIGPIPE, &sigact, NULL) == -1) { - ret = errno; - __db_err(dbenv, ret, "can't access signal handler"); - goto err; - } - } - return (0); - -err: - (void)closesocket(db_rep->listen_fd); - db_rep->listen_fd = INVALID_SOCKET; - return (ret); -} - -/* - * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *)); - */ -int -__repmgr_lock_mutex(mutex) - mgr_mutex_t *mutex; -{ - return (pthread_mutex_lock(mutex)); -} - -/* - * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *)); - */ -int -__repmgr_unlock_mutex(mutex) - mgr_mutex_t *mutex; -{ - return (pthread_mutex_unlock(mutex)); -} - -/* - * Signals a condition variable. - * - * !!! - * Caller must hold mutex. - * - * PUBLIC: int __repmgr_signal __P((cond_var_t *)); - */ -int -__repmgr_signal(v) - cond_var_t *v; -{ - return (pthread_cond_broadcast(v)); -} - -/* - * PUBLIC: int __repmgr_wake_main_thread __P((DB_ENV*)); - */ -int -__repmgr_wake_main_thread(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - u_int8_t any_value; - - COMPQUIET(any_value, 0); - db_rep = dbenv->rep_handle; - - /* - * It doesn't matter what byte value we write. Just the appearance of a - * byte in the stream is enough to wake up the select() thread reading - * the pipe. - */ - if (write(db_rep->write_pipe, &any_value, 1) == -1) - return (errno); - return (0); -} - -/* - * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *)); - */ -int -__repmgr_writev(fd, iovec, buf_count, byte_count_p) - socket_t fd; - db_iovec_t *iovec; - int buf_count; - size_t *byte_count_p; -{ - int nw; - - if ((nw = writev(fd, iovec, buf_count)) == -1) - return (errno); - *byte_count_p = (size_t)nw; - return (0); -} - -/* - * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *)); - */ -int -__repmgr_readv(fd, iovec, buf_count, byte_count_p) - socket_t fd; - db_iovec_t *iovec; - int buf_count; - size_t *byte_count_p; -{ - ssize_t nw; - - if ((nw = readv(fd, iovec, buf_count)) == -1) - return (errno); - *byte_count_p = (size_t)nw; - return (0); -} - -/* - * PUBLIC: int __repmgr_select_loop __P((DB_ENV *)); - */ -int -__repmgr_select_loop(dbenv) - DB_ENV *dbenv; -{ - struct timeval select_timeout, *select_timeout_p; - DB_REP *db_rep; - REPMGR_CONNECTION *conn, *next; - REPMGR_RETRY *retry; - db_timespec timeout; - fd_set reads, writes; - int ret, flow_control, maxfd, nready; - u_int8_t buf[10]; /* arbitrary size */ - - flow_control = FALSE; - - db_rep = dbenv->rep_handle; - /* - * Almost this entire thread operates while holding the mutex. But note - * that it never blocks, except in the call to select() (which is the - * one place we relinquish the mutex). - */ - LOCK_MUTEX(db_rep->mutex); - if ((ret = __repmgr_first_try_connections(dbenv)) != 0) - goto out; - for (;;) { - FD_ZERO(&reads); - FD_ZERO(&writes); - - /* - * Always ask for input on listening socket and signalling - * pipe. - */ - FD_SET((u_int)db_rep->listen_fd, &reads); - maxfd = db_rep->listen_fd; - - FD_SET((u_int)db_rep->read_pipe, &reads); - if (db_rep->read_pipe > maxfd) - maxfd = db_rep->read_pipe; - - /* - * Examine all connections to see what sort of I/O to ask for on - * each one. - */ - TAILQ_FOREACH(conn, &db_rep->connections, entries) { - if (F_ISSET(conn, CONN_CONNECTING)) { - FD_SET((u_int)conn->fd, &reads); - FD_SET((u_int)conn->fd, &writes); - if (conn->fd > maxfd) - maxfd = conn->fd; - continue; - } - - if (!STAILQ_EMPTY(&conn->outbound_queue)) { - FD_SET((u_int)conn->fd, &writes); - if (conn->fd > maxfd) - maxfd = conn->fd; - } - /* - * If we haven't yet gotten site's handshake, then read - * from it even if we're flow-controlling. - */ - if (!flow_control || !IS_VALID_EID(conn->eid)) { - FD_SET((u_int)conn->fd, &reads); - if (conn->fd > maxfd) - maxfd = conn->fd; - } - } - /* - * Decide how long to wait based on when it will next be time to - * retry an idle connection. (List items are in order, so we - * only have to examine the first one.) - */ - if (TAILQ_EMPTY(&db_rep->retries)) - select_timeout_p = NULL; - else { - retry = TAILQ_FIRST(&db_rep->retries); - - __repmgr_timespec_diff_now( - dbenv, &retry->time, &timeout); - - /* Convert the timespec to a timeval. */ - select_timeout.tv_sec = timeout.tv_sec; - select_timeout.tv_usec = timeout.tv_nsec / NS_PER_US; - select_timeout_p = &select_timeout; - } - - UNLOCK_MUTEX(db_rep->mutex); - - if ((ret = select(maxfd + 1, - &reads, &writes, NULL, select_timeout_p)) == -1) { - switch (ret = errno) { - case EINTR: - case EWOULDBLOCK: - LOCK_MUTEX(db_rep->mutex); - continue; /* simply retry */ - default: - __db_err(dbenv, ret, "select"); - return (ret); - } - } - nready = ret; - - LOCK_MUTEX(db_rep->mutex); - - /* - * The first priority thing we must do is to clean up any - * pending defunct connections. Otherwise, if they have any - * lingering pending input, we get very confused if we try to - * process it. - * - * The TAILQ_FOREACH macro would be suitable here, except that - * it doesn't allow unlinking the current element, which is - * needed for cleanup_connection. - */ - for (conn = TAILQ_FIRST(&db_rep->connections); - conn != NULL; - conn = next) { - next = TAILQ_NEXT(conn, entries); - if (F_ISSET(conn, CONN_DEFUNCT)) - __repmgr_cleanup_connection(dbenv, conn); - } - - if ((ret = __repmgr_retry_connections(dbenv)) != 0) - goto out; - if (nready == 0) - continue; - - /* - * Traverse the linked list. (Again, like TAILQ_FOREACH, except - * that we need the ability to unlink an element along the way.) - */ - for (conn = TAILQ_FIRST(&db_rep->connections); - conn != NULL; - conn = next) { - next = TAILQ_NEXT(conn, entries); - if (F_ISSET(conn, CONN_CONNECTING)) { - if (FD_ISSET((u_int)conn->fd, &reads) || - FD_ISSET((u_int)conn->fd, &writes)) { - if ((ret = finish_connecting(dbenv, - conn)) == DB_REP_UNAVAIL) { - if ((ret = - __repmgr_bust_connection( - dbenv, conn, TRUE)) != 0) - goto out; - } else if (ret != 0) - goto out; - } - continue; - } - - /* - * Here, the site is connected, and the FD_SET's are - * valid. - */ - if (FD_ISSET((u_int)conn->fd, &writes)) { - if ((ret = __repmgr_write_some( - dbenv, conn)) == DB_REP_UNAVAIL) { - if ((ret = - __repmgr_bust_connection(dbenv, - conn, TRUE)) != 0) - goto out; - continue; - } else if (ret != 0) - goto out; - } - - if (!flow_control && - FD_ISSET((u_int)conn->fd, &reads)) { - if ((ret = __repmgr_read_from_site(dbenv, conn)) - == DB_REP_UNAVAIL) { - if ((ret = - __repmgr_bust_connection(dbenv, - conn, TRUE)) != 0) - goto out; - continue; - } else if (ret != 0) - goto out; - } - } - - /* - * Read any bytes in the signalling pipe. Note that we don't - * actually need to do anything with them; they're just there to - * wake us up when necessary. - */ - if (FD_ISSET((u_int)db_rep->read_pipe, &reads)) { - if (read(db_rep->read_pipe, buf, sizeof(buf)) <= 0) { - ret = errno; - goto out; - } else if (db_rep->finished) { - ret = 0; - goto out; - } - } - if (FD_ISSET((u_int)db_rep->listen_fd, &reads) && - (ret = __repmgr_accept(dbenv)) != 0) - goto out; - } -out: - UNLOCK_MUTEX(db_rep->mutex); - return (ret); -} - -static int -finish_connecting(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -{ - DB_REP *db_rep; - REPMGR_SITE *site; - socklen_t len; - SITE_STRING_BUFFER buffer; - u_int eid; - int error, ret; - - len = sizeof(error); - if (getsockopt( - conn->fd, SOL_SOCKET, SO_ERROR, (sockopt_t)&error, &len) < 0) - goto err_rpt; - if (error) { - errno = error; - goto err_rpt; - } - - F_CLR(conn, CONN_CONNECTING); - return (__repmgr_send_handshake(dbenv, conn)); - -err_rpt: - db_rep = dbenv->rep_handle; - - DB_ASSERT(dbenv, IS_VALID_EID(conn->eid)); - eid = (u_int)conn->eid; - - site = SITE_FROM_EID(eid); - __db_err(dbenv, errno, - "connecting to %s", __repmgr_format_site_loc(site, buffer)); - - /* If we've exhausted the list of possible addresses, give up. */ - if (ADDR_LIST_NEXT(&site->net_addr) == NULL) - return (DB_REP_UNAVAIL); - - /* - * This is just like a little mini-"bust_connection", except that we - * don't reschedule for later, 'cuz we're just about to try again right - * now. - * - * !!! - * Which means this must only be called on the select() thread, since - * only there are we allowed to actually close a connection. - */ - DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); - __repmgr_cleanup_connection(dbenv, conn); - ret = __repmgr_connect_site(dbenv, eid); - DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); - return (ret); -} diff --git a/db/repmgr/repmgr_queue.c b/db/repmgr/repmgr_queue.c deleted file mode 100644 index af9c84e5a..000000000 --- a/db/repmgr/repmgr_queue.c +++ /dev/null @@ -1,155 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2006,2007 Oracle. All rights reserved. - * - * $Id: repmgr_queue.c,v 1.9 2007/05/17 15:15:51 bostic Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#include "db_int.h" - -typedef STAILQ_HEAD(__repmgr_q_header, __repmgr_message) QUEUE_HEADER; -struct __repmgr_queue { - int size; - QUEUE_HEADER header; -}; - -/* - * PUBLIC: int __repmgr_queue_create __P((DB_ENV *, DB_REP *)); - */ -int -__repmgr_queue_create(dbenv, db_rep) - DB_ENV *dbenv; - DB_REP *db_rep; -{ - REPMGR_QUEUE *q; - int ret; - - if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_QUEUE), &q)) != 0) - return (ret); - q->size = 0; - STAILQ_INIT(&q->header); - db_rep->input_queue = q; - return (0); -} - -/* - * Frees not only the queue header, but also any messages that may be on it, - * along with their data buffers. - * - * PUBLIC: void __repmgr_queue_destroy __P((DB_ENV *)); - */ -void -__repmgr_queue_destroy(dbenv) - DB_ENV *dbenv; -{ - REPMGR_QUEUE *q; - REPMGR_MESSAGE *m; - - if ((q = dbenv->rep_handle->input_queue) == NULL) - return; - - while (!STAILQ_EMPTY(&q->header)) { - m = STAILQ_FIRST(&q->header); - STAILQ_REMOVE_HEAD(&q->header, entries); - __os_free(dbenv, m); - } - __os_free(dbenv, q); -} - -/* - * PUBLIC: int __repmgr_queue_get __P((DB_ENV *, REPMGR_MESSAGE **)); - * - * Get the first input message from the queue and return it to the caller. The - * caller hereby takes responsibility for the entire message buffer, and should - * free it when done. - * - * Note that caller is NOT expected to hold the mutex. This is asymmetric with - * put(), because put() is expected to be called in a loop after select, where - * it's already necessary to be holding the mutex. - */ -int -__repmgr_queue_get(dbenv, msgp) - DB_ENV *dbenv; - REPMGR_MESSAGE **msgp; -{ - DB_REP *db_rep; - REPMGR_QUEUE *q; - REPMGR_MESSAGE *m; - int ret; - - ret = 0; - db_rep = dbenv->rep_handle; - q = db_rep->input_queue; - - LOCK_MUTEX(db_rep->mutex); - while (STAILQ_EMPTY(&q->header) && !db_rep->finished) { -#ifdef DB_WIN32 - if (!ResetEvent(db_rep->queue_nonempty)) { - ret = GetLastError(); - goto err; - } - if (SignalObjectAndWait(db_rep->mutex, db_rep->queue_nonempty, - INFINITE, FALSE) != WAIT_OBJECT_0) { - ret = GetLastError(); - goto err; - } - LOCK_MUTEX(db_rep->mutex); -#else - if ((ret = pthread_cond_wait(&db_rep->queue_nonempty, - &db_rep->mutex)) != 0) - goto err; -#endif - } - if (db_rep->finished) - ret = DB_REP_UNAVAIL; - else { - m = STAILQ_FIRST(&q->header); - STAILQ_REMOVE_HEAD(&q->header, entries); - q->size--; - *msgp = m; - } - -err: - UNLOCK_MUTEX(db_rep->mutex); - return (ret); -} - -/* - * PUBLIC: int __repmgr_queue_put __P((DB_ENV *, REPMGR_MESSAGE *)); - * - * !!! - * Caller must hold repmgr->mutex. - */ -int -__repmgr_queue_put(dbenv, msg) - DB_ENV *dbenv; - REPMGR_MESSAGE *msg; -{ - DB_REP *db_rep; - REPMGR_QUEUE *q; - - db_rep = dbenv->rep_handle; - q = db_rep->input_queue; - - STAILQ_INSERT_TAIL(&q->header, msg, entries); - q->size++; - - return (__repmgr_signal(&db_rep->queue_nonempty)); -} - -/* - * PUBLIC: int __repmgr_queue_size __P((DB_ENV *)); - * - * !!! - * Caller must hold repmgr->mutex. - */ -int -__repmgr_queue_size(dbenv) - DB_ENV *dbenv; -{ - return (dbenv->rep_handle->input_queue->size); -} diff --git a/db/repmgr/repmgr_sel.c b/db/repmgr/repmgr_sel.c deleted file mode 100644 index f1aa2ad88..000000000 --- a/db/repmgr/repmgr_sel.c +++ /dev/null @@ -1,875 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2006,2007 Oracle. All rights reserved. - * - * $Id: repmgr_sel.c,v 1.36 2007/06/11 18:29:34 alanb Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#include "db_int.h" - -static int __repmgr_connect __P((DB_ENV*, socket_t *, REPMGR_SITE *)); -static int dispatch_phase_completion __P((DB_ENV *, REPMGR_CONNECTION *)); -static int notify_handshake __P((DB_ENV *, REPMGR_CONNECTION *)); -static int record_ack __P((DB_ENV *, REPMGR_SITE *, DB_REPMGR_ACK *)); -static int __repmgr_try_one __P((DB_ENV *, u_int)); - -/* - * PUBLIC: void *__repmgr_select_thread __P((void *)); - */ -void * -__repmgr_select_thread(args) - void *args; -{ - DB_ENV *dbenv = args; - int ret; - - if ((ret = __repmgr_select_loop(dbenv)) != 0) { - __db_err(dbenv, ret, "select loop failed"); - __repmgr_thread_failure(dbenv, ret); - } - return (NULL); -} - -/* - * PUBLIC: int __repmgr_accept __P((DB_ENV *)); - * - * !!! - * Only ever called in the select() thread, since we may call - * __repmgr_bust_connection(..., TRUE). - */ -int -__repmgr_accept(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - REPMGR_CONNECTION *conn; - struct sockaddr_in siaddr; - socklen_t addrlen; - socket_t s; - int ret; -#ifdef DB_WIN32 - WSAEVENT event_obj; -#endif - - db_rep = dbenv->rep_handle; - addrlen = sizeof(siaddr); - if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr, - &addrlen)) == -1) { - /* - * Some errors are innocuous and so should be ignored. MSDN - * Library documents the Windows ones; the Unix ones are - * advocated in Stevens' UNPv1, section 16.6; and Linux - * Application Development, p. 416. - */ - switch (ret = net_errno) { -#ifdef DB_WIN32 - case WSAECONNRESET: - case WSAEWOULDBLOCK: -#else - case EINTR: - case EWOULDBLOCK: - case ECONNABORTED: - case ENETDOWN: -#ifdef EPROTO - case EPROTO: -#endif - case ENOPROTOOPT: - case EHOSTDOWN: -#ifdef ENONET - case ENONET: -#endif - case EHOSTUNREACH: - case EOPNOTSUPP: - case ENETUNREACH: -#endif - RPRINT(dbenv, (dbenv, - "accept error %d considered innocuous", ret)); - return (0); - default: - __db_err(dbenv, ret, "accept error"); - return (ret); - } - } - RPRINT(dbenv, (dbenv, "accepted a new connection")); - - if ((ret = __repmgr_set_nonblocking(s)) != 0) { - __db_err(dbenv, ret, "can't set nonblock after accept"); - (void)closesocket(s); - return (ret); - } - -#ifdef DB_WIN32 - if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) { - ret = net_errno; - __db_err(dbenv, ret, "can't create WSA event"); - (void)closesocket(s); - return (ret); - } - if (WSAEventSelect(s, event_obj, FD_READ|FD_CLOSE) == SOCKET_ERROR) { - ret = net_errno; - __db_err(dbenv, ret, "can't set desired event bits"); - (void)WSACloseEvent(event_obj); - (void)closesocket(s); - return (ret); - } -#endif - if ((ret = __repmgr_new_connection(dbenv, &conn, s, 0)) != 0) { -#ifdef DB_WIN32 - (void)WSACloseEvent(event_obj); -#endif - (void)closesocket(s); - return (ret); - } - conn->eid = -1; -#ifdef DB_WIN32 - conn->event_object = event_obj; -#endif - - switch (ret = __repmgr_send_handshake(dbenv, conn)) { - case 0: - return (0); - case DB_REP_UNAVAIL: - return (__repmgr_bust_connection(dbenv, conn, TRUE)); - default: - return (ret); - } -} - -/* - * Initiates connection attempts for any sites on the idle list whose retry - * times have expired. - * - * PUBLIC: int __repmgr_retry_connections __P((DB_ENV *)); - * - * !!! - * Assumes caller holds the mutex. - */ -int -__repmgr_retry_connections(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - REPMGR_RETRY *retry; - db_timespec now; - u_int eid; - int ret; - - db_rep = dbenv->rep_handle; - __os_gettime(dbenv, &now); - - while (!TAILQ_EMPTY(&db_rep->retries)) { - retry = TAILQ_FIRST(&db_rep->retries); - if (timespeccmp(&retry->time, &now, >=)) - break; /* since items are in time order */ - - TAILQ_REMOVE(&db_rep->retries, retry, entries); - - eid = retry->eid; - __os_free(dbenv, retry); - - if ((ret = __repmgr_try_one(dbenv, eid)) != 0) - return (ret); - } - return (0); -} - -/* - * PUBLIC: int __repmgr_first_try_connections __P((DB_ENV *)); - * - * !!! - * Assumes caller holds the mutex. - */ -int -__repmgr_first_try_connections(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - u_int eid; - int ret; - - db_rep = dbenv->rep_handle; - for (eid=0; eid<db_rep->site_cnt; eid++) - if ((ret = __repmgr_try_one(dbenv, eid)) != 0) - return (ret); - return (0); -} - -/* - * Makes a best-effort attempt to connect to the indicated site. Returns a - * non-zero error indication only for disastrous failures. For re-tryable - * errors, we will have scheduled another attempt, and that can be considered - * success enough. - */ -static int -__repmgr_try_one(dbenv, eid) - DB_ENV *dbenv; - u_int eid; -{ - DB_REP *db_rep; - ADDRINFO *list; - repmgr_netaddr_t *addr; - int ret; - - db_rep = dbenv->rep_handle; - - /* - * If have never yet successfully resolved this site's host name, try to - * do so now. - * - * Throughout all the rest of repmgr, we almost never do any sort of - * blocking operation in the select thread. This is the sole exception - * to that rule. Fortunately, it should rarely happen: - * - * - for a site that we only learned about because it connected to us: - * not only were we not configured to know about it, but we also never - * got a NEWSITE message about it. And even then only if the - * connection fails and we want to retry it from this end; - * - * - if the name look-up system (e.g., DNS) is not working (let's hope - * it's temporary), or the host name is not found. - */ - addr = &SITE_FROM_EID(eid)->net_addr; - if (ADDR_LIST_FIRST(addr) == NULL) { - if ((ret = __repmgr_getaddr( - dbenv, addr->host, addr->port, 0, &list)) == 0) { - addr->address_list = list; - (void)ADDR_LIST_FIRST(addr); - } else if (ret == DB_REP_UNAVAIL) - return (__repmgr_schedule_connection_attempt( - dbenv, eid, FALSE)); - else - return (ret); - } - - /* Here, when we have a valid address. */ - return (__repmgr_connect_site(dbenv, eid)); -} - -/* - * Tries to establish a connection with the site indicated by the given eid, - * starting with the "current" element of its address list and trying as many - * addresses as necessary until the list is exhausted. - * - * !!! - * Only ever called in the select() thread, since we may call - * __repmgr_bust_connection(..., TRUE). - * - * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid)); - */ -int -__repmgr_connect_site(dbenv, eid) - DB_ENV *dbenv; - u_int eid; -{ - DB_REP *db_rep; - REPMGR_SITE *site; - REPMGR_CONNECTION *con; - socket_t s; - u_int32_t flags; - int ret; -#ifdef DB_WIN32 - long desired_event; - WSAEVENT event_obj; -#endif - - db_rep = dbenv->rep_handle; - site = SITE_FROM_EID(eid); - - flags = 0; - switch (ret = __repmgr_connect(dbenv, &s, site)) { - case 0: - flags = 0; -#ifdef DB_WIN32 - desired_event = FD_READ|FD_CLOSE; -#endif - break; - case INPROGRESS: - flags = CONN_CONNECTING; -#ifdef DB_WIN32 - desired_event = FD_CONNECT; -#endif - break; - default: - STAT(db_rep->region->mstat.st_connect_fail++); - return ( - __repmgr_schedule_connection_attempt(dbenv, eid, FALSE)); - } - -#ifdef DB_WIN32 - if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) { - ret = net_errno; - __db_err(dbenv, ret, "can't create WSA event"); - (void)closesocket(s); - return (ret); - } - if (WSAEventSelect(s, event_obj, desired_event) == SOCKET_ERROR) { - ret = net_errno; - __db_err(dbenv, ret, "can't set desired event bits"); - (void)WSACloseEvent(event_obj); - (void)closesocket(s); - return (ret); - } -#endif - - if ((ret = __repmgr_new_connection(dbenv, &con, s, flags)) - != 0) { -#ifdef DB_WIN32 - (void)WSACloseEvent(event_obj); -#endif - (void)closesocket(s); - return (ret); - } -#ifdef DB_WIN32 - con->event_object = event_obj; -#endif - - if (flags == 0) { - switch (ret = __repmgr_send_handshake(dbenv, con)) { - case 0: - break; - case DB_REP_UNAVAIL: - return (__repmgr_bust_connection(dbenv, con, TRUE)); - default: - return (ret); - } - } - - con->eid = (int)eid; - - site->ref.conn = con; - site->state = SITE_CONNECTED; - return (0); -} - -static int -__repmgr_connect(dbenv, socket_result, site) - DB_ENV *dbenv; - socket_t *socket_result; - REPMGR_SITE *site; -{ - repmgr_netaddr_t *addr; - ADDRINFO *ai; - socket_t s; - char *why; - int ret; - SITE_STRING_BUFFER buffer; - - /* - * Lint doesn't know about DB_ASSERT, so it can't tell that this - * loop will always get executed at least once, giving 'why' a value. - */ - COMPQUIET(why, ""); - addr = &site->net_addr; - ai = ADDR_LIST_CURRENT(addr); - DB_ASSERT(dbenv, ai != NULL); - for (; ai != NULL; ai = ADDR_LIST_NEXT(addr)) { - - if ((s = socket(ai->ai_family, - ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) { - why = "can't create socket to connect"; - continue; - } - - if ((ret = __repmgr_set_nonblocking(s)) != 0) { - __db_err(dbenv, - ret, "can't make nonblock socket to connect"); - (void)closesocket(s); - return (ret); - } - - if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) - ret = net_errno; - - if (ret == 0 || ret == INPROGRESS) { - *socket_result = s; - RPRINT(dbenv, (dbenv, - "init connection to %s with result %d", - __repmgr_format_site_loc(site, buffer), ret)); - return (ret); - } - - why = "connection failed"; - (void)closesocket(s); - } - - /* We've exhausted all possible addresses. */ - ret = net_errno; - __db_err(dbenv, ret, "%s to %s", why, - __repmgr_format_site_loc(site, buffer)); - return (ret); -} - -/* - * PUBLIC: int __repmgr_send_handshake __P((DB_ENV *, REPMGR_CONNECTION *)); - */ -int -__repmgr_send_handshake(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -{ - DB_REP *db_rep; - REP *rep; - repmgr_netaddr_t *my_addr; - DB_REPMGR_HANDSHAKE buffer; - DBT cntrl, rec; - - DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING | CONN_DEFUNCT)); - - db_rep = dbenv->rep_handle; - rep = db_rep->region; - my_addr = &db_rep->my_addr; - - /* - * Remind us that we need to fix priority on the rep API not to be an - * int, if we're going to pass it across the wire. - */ - DB_ASSERT(dbenv, sizeof(u_int32_t) >= sizeof(int)); - - buffer.version = DB_REPMGR_VERSION; - buffer.priority = htonl((u_int32_t)rep->priority); - buffer.port = my_addr->port; - cntrl.data = &buffer; - cntrl.size = sizeof(buffer); - - DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); - - return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec)); -} - -/* - * PUBLIC: int __repmgr_read_from_site __P((DB_ENV *, REPMGR_CONNECTION *)); - * - * !!! - * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here. - */ -int -__repmgr_read_from_site(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -{ - SITE_STRING_BUFFER buffer; - size_t nr; - int ret; - - /* - * Keep reading pieces as long as we're making some progress, or until - * we complete the current read phase. - */ - for (;;) { - if ((ret = __repmgr_readv(conn->fd, - &conn->iovecs.vectors[conn->iovecs.offset], - conn->iovecs.count - conn->iovecs.offset, &nr)) != 0) { - switch (ret) { -#ifndef DB_WIN32 - case EINTR: - continue; -#endif - case WOULDBLOCK: - return (0); - default: - (void)__repmgr_format_eid_loc(dbenv->rep_handle, - conn->eid, buffer); - __db_err(dbenv, ret, - "can't read from %s", buffer); - STAT(dbenv->rep_handle-> - region->mstat.st_connection_drop++); - return (DB_REP_UNAVAIL); - } - } - - if (nr > 0) { - if (__repmgr_update_consumed(&conn->iovecs, nr)) - return (dispatch_phase_completion(dbenv, - conn)); - } else { - (void)__repmgr_format_eid_loc(dbenv->rep_handle, - conn->eid, buffer); - __db_errx(dbenv, "EOF on connection from %s", buffer); - STAT(dbenv->rep_handle-> - region->mstat.st_connection_drop++); - return (DB_REP_UNAVAIL); - } - } -} - -/* - * Handles whatever needs to be done upon the completion of a reading phase on a - * given connection. - */ -static int -dispatch_phase_completion(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -{ -#define MEM_ALIGN sizeof(double) - DB_REP *db_rep; - DB_REPMGR_ACK *ack; - REPMGR_SITE *site; - DB_REPMGR_HANDSHAKE *handshake; - REPMGR_RETRY *retry; - repmgr_netaddr_t addr; - DBT *dbt; - u_int32_t control_size, rec_size; - size_t memsize, control_offset, rec_offset; - void *membase; - char *host; - u_int port; - int ret, eid; - - db_rep = dbenv->rep_handle; - switch (conn->reading_phase) { - case SIZES_PHASE: - /* - * We've received the header: a message type and the lengths of - * the two pieces of the message. Set up buffers to read the - * two pieces. - */ - - if (conn->msg_type != REPMGR_HANDSHAKE && - !IS_VALID_EID(conn->eid)) { - __db_errx(dbenv, - "expected handshake as first msg from passively connected site"); - return (DB_REP_UNAVAIL); - } - - __repmgr_iovec_init(&conn->iovecs); - control_size = ntohl(conn->control_size_buf); - rec_size = ntohl(conn->rec_size_buf); - if (conn->msg_type == REPMGR_REP_MESSAGE) { - /* - * Allocate a block of memory large enough to hold a - * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT - * data areas that it points to. Start by calculating - * the total memory needed, rounding up for the start of - * each DBT, to ensure possible alignment requirements. - */ - memsize = (size_t) - DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN); - control_offset = memsize; - memsize += control_size; - if (rec_size > 0) { - memsize = (size_t)DB_ALIGN(memsize, MEM_ALIGN); - rec_offset = memsize; - memsize += rec_size; - } else - COMPQUIET(rec_offset, 0); - if ((ret = __os_malloc(dbenv, memsize, &membase)) != 0) - return (ret); - conn->input.rep_message = membase; - memset(&conn->input.rep_message->control, 0, - sizeof(DBT)); - memset(&conn->input.rep_message->rec, 0, sizeof(DBT)); - conn->input.rep_message->originating_eid = conn->eid; - conn->input.rep_message->control.size = control_size; - conn->input.rep_message->control.data = - (u_int8_t*)membase + control_offset; - __repmgr_add_buffer(&conn->iovecs, - conn->input.rep_message->control.data, - control_size); - - conn->input.rep_message->rec.size = rec_size; - if (rec_size > 0) { - conn->input.rep_message->rec.data = - (u_int8_t*)membase + rec_offset; - __repmgr_add_buffer(&conn->iovecs, - conn->input.rep_message->rec.data, - rec_size); - } else - conn->input.rep_message->rec.data = NULL; - - } else { - if (control_size == 0) { - __db_errx( - dbenv, "illegal size for non-rep msg"); - return (DB_REP_UNAVAIL); - } - conn->input.repmgr_msg.cntrl.size = control_size; - conn->input.repmgr_msg.rec.size = rec_size; - - dbt = &conn->input.repmgr_msg.cntrl; - dbt->size = control_size; - if ((ret = __os_malloc(dbenv, control_size, - &dbt->data)) != 0) - return (ret); - __repmgr_add_dbt(&conn->iovecs, dbt); - - dbt = &conn->input.repmgr_msg.rec; - if ((dbt->size = rec_size) > 0) { - if ((ret = __os_malloc(dbenv, rec_size, - &dbt->data)) != 0) { - __os_free(dbenv, - conn->input.repmgr_msg.cntrl.data); - return (ret); - } - __repmgr_add_dbt(&conn->iovecs, dbt); - } - } - - conn->reading_phase = DATA_PHASE; - break; - - case DATA_PHASE: - /* - * We have a complete message, so process it. Acks and - * handshakes get processed here, in line. Regular rep messages - * get posted to a queue, to be handled by a thread from the - * message thread pool. - */ - switch (conn->msg_type) { - case REPMGR_ACK: - /* - * Extract the LSN. Save it only if it is an - * improvement over what the site has already ack'ed. - */ - ack = conn->input.repmgr_msg.cntrl.data; - if (conn->input.repmgr_msg.cntrl.size != sizeof(*ack) || - conn->input.repmgr_msg.rec.size != 0) { - __db_errx(dbenv, "bad ack msg size"); - return (DB_REP_UNAVAIL); - } - if ((ret = record_ack(dbenv, SITE_FROM_EID(conn->eid), - ack)) != 0) - return (ret); - __os_free(dbenv, conn->input.repmgr_msg.cntrl.data); - break; - - case REPMGR_HANDSHAKE: - handshake = conn->input.repmgr_msg.cntrl.data; - if (conn->input.repmgr_msg.cntrl.size >= - sizeof(handshake->version) && - handshake->version != DB_REPMGR_VERSION) { - __db_errx(dbenv, - "mismatched repmgr message protocol version (%lu)", - (u_long)handshake->version); - return (DB_REP_UNAVAIL); - } - if (conn->input.repmgr_msg.cntrl.size != - sizeof(*handshake) || - conn->input.repmgr_msg.rec.size == 0) { - __db_errx(dbenv, "bad handshake msg size"); - return (DB_REP_UNAVAIL); - } - - port = handshake->port; - host = conn->input.repmgr_msg.rec.data; - host[conn->input.repmgr_msg.rec.size-1] = '\0'; - - RPRINT(dbenv, (dbenv, - "got handshake %s:%u, pri %lu", host, port, - (u_long)ntohl(handshake->priority))); - - if (IS_VALID_EID(conn->eid)) { - /* - * We must have initiated this as an outgoing - * connection, since we already know the EID. - * All we need from the handshake is the - * priority. - */ - site = SITE_FROM_EID(conn->eid); - RPRINT(dbenv, (dbenv, - "handshake from connection to %s:%lu", - site->net_addr.host, - (u_long)site->net_addr.port)); - } else { - if (IS_VALID_EID(eid = - __repmgr_find_site(dbenv, host, port))) { - site = SITE_FROM_EID(eid); - if (site->state == SITE_IDLE) { - RPRINT(dbenv, (dbenv, - "handshake from previously idle site")); - retry = site->ref.retry; - TAILQ_REMOVE(&db_rep->retries, - retry, entries); - __os_free(dbenv, retry); - - conn->eid = eid; - site->state = SITE_CONNECTED; - site->ref.conn = conn; - } else { - /* - * We got an incoming connection - * for a site we were already - * connected to; discard it. - */ - __db_errx(dbenv, - "redundant incoming connection will be ignored"); - return (DB_REP_UNAVAIL); - } - } else { - RPRINT(dbenv, (dbenv, - "handshake introduces unknown site")); - if ((ret = __repmgr_pack_netaddr( - dbenv, host, port, NULL, - &addr)) != 0) - return (ret); - if ((ret = __repmgr_new_site( - dbenv, &site, &addr, - SITE_CONNECTED)) != 0) { - __repmgr_cleanup_netaddr(dbenv, - &addr); - return (ret); - } - conn->eid = EID_FROM_SITE(site); - site->ref.conn = conn; - } - } - - /* TODO: change priority to be u_int32_t. */ - DB_ASSERT(dbenv, sizeof(int) == sizeof(u_int32_t)); - site->priority = (int)ntohl(handshake->priority); - - if ((ret = notify_handshake(dbenv, conn)) != 0) - return (ret); - - __os_free(dbenv, conn->input.repmgr_msg.cntrl.data); - __os_free(dbenv, conn->input.repmgr_msg.rec.data); - break; - - case REPMGR_REP_MESSAGE: - if ((ret = __repmgr_queue_put(dbenv, - conn->input.rep_message)) != 0) - return (ret); - /* - * The queue has taken over responsibility for the - * rep_message buffer, and will free it later. - */ - break; - - default: - __db_errx(dbenv, "unknown msg type rcvd: %d", - (int)conn->msg_type); - return (DB_REP_UNAVAIL); - } - - __repmgr_reset_for_reading(conn); - break; - - default: - DB_ASSERT(dbenv, FALSE); - } - - return (0); -} - -/* - * Performs any processing needed upon the receipt of a handshake. - * - * !!! - * Caller must hold mutex. - */ -static int -notify_handshake(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -{ - DB_REP *db_rep; - - COMPQUIET(conn, NULL); - - db_rep = dbenv->rep_handle; - /* - * If we're moping around wishing we knew who the master was, then - * getting in touch with another site might finally provide sufficient - * connectivity to find out. But just do this once, because otherwise - * we get messages while the subsequent rep_start operations are going - * on, and rep tosses them in that case. - */ - if (db_rep->master_eid == DB_EID_INVALID && !db_rep->done_one) { - db_rep->done_one = TRUE; - RPRINT(dbenv, (dbenv, - "handshake with no known master to wake election thread")); - return (__repmgr_init_election(dbenv, ELECT_REPSTART)); - } - return (0); -} - -static int -record_ack(dbenv, site, ack) - DB_ENV *dbenv; - REPMGR_SITE *site; - DB_REPMGR_ACK *ack; -{ - DB_REP *db_rep; - SITE_STRING_BUFFER buffer; - int ret; - - db_rep = dbenv->rep_handle; - - /* Ignore stale acks. */ - if (ack->generation < db_rep->generation) { - RPRINT(dbenv, (dbenv, - "ignoring stale ack (%lu<%lu), from %s", - (u_long)ack->generation, (u_long)db_rep->generation, - __repmgr_format_site_loc(site, buffer))); - return (0); - } - RPRINT(dbenv, (dbenv, - "got ack [%lu][%lu](%lu) from %s", (u_long)ack->lsn.file, - (u_long)ack->lsn.offset, (u_long)ack->generation, - __repmgr_format_site_loc(site, buffer))); - - if (ack->generation == db_rep->generation && - log_compare(&ack->lsn, &site->max_ack) == 1) { - memcpy(&site->max_ack, &ack->lsn, sizeof(DB_LSN)); - if ((ret = __repmgr_wake_waiting_senders(dbenv)) != 0) - return (ret); - } - return (0); -} - -/* - * PUBLIC: int __repmgr_write_some __P((DB_ENV *, REPMGR_CONNECTION *)); - */ -int -__repmgr_write_some(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -{ - REPMGR_FLAT *msg; - QUEUED_OUTPUT *output; - int bytes, ret; - - while (!STAILQ_EMPTY(&conn->outbound_queue)) { - output = STAILQ_FIRST(&conn->outbound_queue); - msg = output->msg; - if ((bytes = send(conn->fd, &msg->data[output->offset], - (size_t)msg->length - output->offset, 0)) == SOCKET_ERROR) { - if ((ret = net_errno) == WOULDBLOCK) - return (0); - else { - __db_err(dbenv, ret, "writing data"); - STAT(dbenv->rep_handle-> - region->mstat.st_connection_drop++); - return (DB_REP_UNAVAIL); - } - } - - if ((output->offset += (size_t)bytes) >= msg->length) { - STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries); - __os_free(dbenv, output); - conn->out_queue_length--; - if (--msg->ref_count <= 0) - __os_free(dbenv, msg); - } - } - -#ifdef DB_WIN32 - /* - * With the queue now empty, it's time to relinquish ownership of this - * connection again, so that the next call to send() can write the - * message in line, instead of posting it to the queue for us. - */ - if (WSAEventSelect(conn->fd, conn->event_object, FD_READ|FD_CLOSE) - == SOCKET_ERROR) { - ret = net_errno; - __db_err(dbenv, ret, "can't remove FD_WRITE event bit"); - return (ret); - } -#endif - - return (0); -} diff --git a/db/repmgr/repmgr_stat.c b/db/repmgr/repmgr_stat.c deleted file mode 100644 index 825bfa765..000000000 --- a/db/repmgr/repmgr_stat.c +++ /dev/null @@ -1,298 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2005,2007 Oracle. All rights reserved. - * - * $Id: repmgr_stat.c,v 1.35 2007/06/22 18:26:52 bostic Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#include "db_int.h" - -#ifdef HAVE_STATISTICS -static int __repmgr_print_all __P((DB_ENV *, u_int32_t)); -static int __repmgr_print_sites __P((DB_ENV *)); -static int __repmgr_print_stats __P((DB_ENV *, u_int32_t)); -static int __repmgr_stat __P((DB_ENV *, DB_REPMGR_STAT **, u_int32_t)); -static int __repmgr_stat_print __P((DB_ENV *, u_int32_t)); - -/* - * __repmgr_stat_pp -- - * DB_ENV->repmgr_stat pre/post processing. - * - * PUBLIC: int __repmgr_stat_pp __P((DB_ENV *, DB_REPMGR_STAT **, u_int32_t)); - */ -int -__repmgr_stat_pp(dbenv, statp, flags) - DB_ENV *dbenv; - DB_REPMGR_STAT **statp; - u_int32_t flags; -{ - DB_THREAD_INFO *ip; - int ret; - - PANIC_CHECK(dbenv); - ENV_REQUIRES_CONFIG_XX( - dbenv, rep_handle, "DB_ENV->repmgr_stat", DB_INIT_REP); - - if ((ret = __db_fchk(dbenv, - "DB_ENV->repmgr_stat", flags, DB_STAT_CLEAR)) != 0) - return (ret); - - ENV_ENTER(dbenv, ip); - ret = __repmgr_stat(dbenv, statp, flags); - ENV_LEAVE(dbenv, ip); - - return (ret); -} - -/* - * __repmgr_stat -- - * DB_ENV->repmgr_stat. - */ -static int -__repmgr_stat(dbenv, statp, flags) - DB_ENV *dbenv; - DB_REPMGR_STAT **statp; - u_int32_t flags; -{ - DB_REP *db_rep; - DB_REPMGR_STAT *stats; - int ret; - - db_rep = dbenv->rep_handle; - - *statp = NULL; - - /* Allocate a stat struct to return to the user. */ - if ((ret = __os_umalloc(dbenv, sizeof(DB_REPMGR_STAT), &stats)) != 0) - return (ret); - - memcpy(stats, &db_rep->region->mstat, sizeof(*stats)); - if (LF_ISSET(DB_STAT_CLEAR)) - memset(&db_rep->region->mstat, 0, sizeof(DB_REPMGR_STAT)); - - *statp = stats; - return (0); -} - -/* - * __repmgr_stat_print_pp -- - * DB_ENV->repmgr_stat_print pre/post processing. - * - * PUBLIC: int __repmgr_stat_print_pp __P((DB_ENV *, u_int32_t)); - */ -int -__repmgr_stat_print_pp(dbenv, flags) - DB_ENV *dbenv; - u_int32_t flags; -{ - DB_THREAD_INFO *ip; - int ret; - - PANIC_CHECK(dbenv); - ENV_REQUIRES_CONFIG_XX( - dbenv, rep_handle, "DB_ENV->repmgr_stat_print", DB_INIT_REP); - - if ((ret = __db_fchk(dbenv, "DB_ENV->repmgr_stat_print", - flags, DB_STAT_ALL | DB_STAT_CLEAR)) != 0) - return (ret); - - ENV_ENTER(dbenv, ip); - ret = __repmgr_stat_print(dbenv, flags); - ENV_LEAVE(dbenv, ip); - - return (ret); -} - -int -__repmgr_stat_print(dbenv, flags) - DB_ENV *dbenv; - u_int32_t flags; -{ - u_int32_t orig_flags; - int ret; - - orig_flags = flags; - LF_CLR(DB_STAT_CLEAR | DB_STAT_SUBSYSTEM); - if (flags == 0 || LF_ISSET(DB_STAT_ALL)) { - if ((ret = __repmgr_print_stats(dbenv, orig_flags)) == 0) - ret = __repmgr_print_sites(dbenv); - if (flags == 0 || ret != 0) - return (ret); - } - - if (LF_ISSET(DB_STAT_ALL) && - (ret = __repmgr_print_all(dbenv, orig_flags)) != 0) - return (ret); - - return (0); -} - -static int -__repmgr_print_stats(dbenv, flags) - DB_ENV *dbenv; - u_int32_t flags; -{ - DB_REPMGR_STAT *sp; - int ret; - - if ((ret = __repmgr_stat(dbenv, &sp, flags)) != 0) - return (ret); - - __db_dl(dbenv, "Number of PERM messages not acknowledged", - (u_long)sp->st_perm_failed); - __db_dl(dbenv, "Number of messages queued due to network delay", - (u_long)sp->st_msgs_queued); - __db_dl(dbenv, "Number of messages discarded due to queue length", - (u_long)sp->st_msgs_dropped); - __db_dl(dbenv, "Number of existing connections dropped", - (u_long)sp->st_connection_drop); - __db_dl(dbenv, "Number of failed new connection attempts", - (u_long)sp->st_connect_fail); - - __os_ufree(dbenv, sp); - - return (0); -} - -static int -__repmgr_print_sites(dbenv) - DB_ENV *dbenv; -{ - DB_REPMGR_SITE *list; - u_int count, i; - int ret; - - if ((ret = __repmgr_site_list(dbenv, &count, &list)) != 0) - return (ret); - - if (count == 0) - return (0); - - __db_msg(dbenv, "%s", DB_GLOBAL(db_line)); - __db_msg(dbenv, "DB_REPMGR site information:"); - - for (i = 0; i < count; ++i) { - __db_msg(dbenv, "%s (eid: %d, port: %u, %sconnected)", - list[i].host, list[i].eid, list[i].port, - list[i].status == DB_REPMGR_CONNECTED ? "" : "dis"); - } - - __os_ufree(dbenv, list); - - return (0); -} - -/* - * __repmgr_print_all -- - * Display debugging replication manager statistics. - */ -static int -__repmgr_print_all(dbenv, flags) - DB_ENV *dbenv; - u_int32_t flags; -{ - COMPQUIET(dbenv, NULL); - COMPQUIET(flags, 0); - return (0); -} - -#else /* !HAVE_STATISTICS */ - -int -__repmgr_stat_pp(dbenv, statp, flags) - DB_ENV *dbenv; - DB_REPMGR_STAT **statp; - u_int32_t flags; -{ - COMPQUIET(statp, NULL); - COMPQUIET(flags, 0); - - return (__db_stat_not_built(dbenv)); -} - -int -__repmgr_stat_print_pp(dbenv, flags) - DB_ENV *dbenv; - u_int32_t flags; -{ - COMPQUIET(flags, 0); - - return (__db_stat_not_built(dbenv)); -} -#endif - -/* - * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **)); - */ -int -__repmgr_site_list(dbenv, countp, listp) - DB_ENV *dbenv; - u_int *countp; - DB_REPMGR_SITE **listp; -{ - DB_REP *db_rep; - DB_REPMGR_SITE *status; - REPMGR_SITE *site; - size_t array_size, total_size; - u_int count, i; - int locked, ret; - char *name; - - db_rep = dbenv->rep_handle; - if (REPMGR_SYNC_INITED(db_rep)) { - LOCK_MUTEX(db_rep->mutex); - locked = TRUE; - } else - locked = FALSE; - - /* Initialize for empty list or error return. */ - ret = 0; - *countp = 0; - *listp = NULL; - - /* First, add up how much memory we need for the host names. */ - if ((count = db_rep->site_cnt) == 0) - goto err; - - array_size = sizeof(DB_REPMGR_SITE) * count; - total_size = array_size; - for (i = 0; i < count; i++) { - site = &db_rep->sites[i]; - - /* Make room for the NUL terminating byte. */ - total_size += strlen(site->net_addr.host) + 1; - } - - if ((ret = __os_umalloc(dbenv, total_size, &status)) != 0) - goto err; - - /* - * Put the storage for the host names after the array of structs. This - * way, the caller can free the whole thing in one single operation. - */ - name = (char *)((u_int8_t *)status + array_size); - for (i = 0; i < count; i++) { - site = &db_rep->sites[i]; - - status[i].eid = EID_FROM_SITE(site); - - status[i].host = name; - (void)strcpy(name, site->net_addr.host); - name += strlen(name) + 1; - - status[i].port = site->net_addr.port; - status[i].status = site->state == SITE_CONNECTED ? - DB_REPMGR_CONNECTED : DB_REPMGR_DISCONNECTED; - } - - *countp = count; - *listp = status; - -err: if (locked) - UNLOCK_MUTEX(db_rep->mutex); - return (ret); -} diff --git a/db/repmgr/repmgr_stub.c b/db/repmgr/repmgr_stub.c deleted file mode 100644 index 1f74f6f01..000000000 --- a/db/repmgr/repmgr_stub.c +++ /dev/null @@ -1,198 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 1996,2007 Oracle. All rights reserved. - * - * $Id: repmgr_stub.c,v 1.7 2007/05/17 15:15:51 bostic Exp $ - */ - -#ifndef HAVE_REPLICATION_THREADS -#include "db_config.h" - -#include "db_int.h" - -/* - * If the library wasn't compiled with replication support, various routines - * aren't available. Stub them here, returning an appropriate error. - */ -static int __db_norepmgr __P((DB_ENV *)); - -/* - * __db_norepmgr -- - * Error when a Berkeley DB build doesn't include replication mgr support. - */ -static int -__db_norepmgr(dbenv) - DB_ENV *dbenv; -{ - __db_errx(dbenv, - "library build did not include support for the Replication Manager"); - return (DB_OPNOTSUP); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_close __P((DB_ENV *)); - * PUBLIC: #endif - */ -int -__repmgr_close(dbenv) - DB_ENV *dbenv; -{ - COMPQUIET(dbenv, NULL); - return (0); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_add_remote_site - * PUBLIC: __P((DB_ENV *, const char *, u_int, int *, u_int32_t)); - * PUBLIC: #endif - */ -int -__repmgr_add_remote_site(dbenv, host, port, eidp, flags) - DB_ENV *dbenv; - const char *host; - u_int port; - int *eidp; - u_int32_t flags; -{ - COMPQUIET(host, NULL); - COMPQUIET(port, 0); - COMPQUIET(eidp, NULL); - COMPQUIET(flags, 0); - return (__db_norepmgr(dbenv)); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *)); - * PUBLIC: #endif - */ -int -__repmgr_get_ack_policy(dbenv, policy) - DB_ENV *dbenv; - int *policy; -{ - COMPQUIET(policy, NULL); - return (__db_norepmgr(dbenv)); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int)); - * PUBLIC: #endif - */ -int -__repmgr_set_ack_policy(dbenv, policy) - DB_ENV *dbenv; - int policy; -{ - COMPQUIET(policy, 0); - return (__db_norepmgr(dbenv)); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_set_local_site - * PUBLIC: __P((DB_ENV *, const char *, u_int, u_int32_t)); - * PUBLIC: #endif - */ -int -__repmgr_set_local_site(dbenv, host, port, flags) - DB_ENV *dbenv; - const char *host; - u_int port; - u_int32_t flags; -{ - COMPQUIET(host, NULL); - COMPQUIET(port, 0); - COMPQUIET(flags, 0); - return (__db_norepmgr(dbenv)); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **)); - * PUBLIC: #endif - */ -int -__repmgr_site_list(dbenv, countp, listp) - DB_ENV *dbenv; - u_int *countp; - DB_REPMGR_SITE **listp; -{ - COMPQUIET(countp, NULL); - COMPQUIET(listp, NULL); - return (__db_norepmgr(dbenv)); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t)); - * PUBLIC: #endif - */ -int -__repmgr_start(dbenv, nthreads, flags) - DB_ENV *dbenv; - int nthreads; - u_int32_t flags; -{ - COMPQUIET(nthreads, 0); - COMPQUIET(flags, 0); - return (__db_norepmgr(dbenv)); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_stat_pp __P((DB_ENV *, DB_REPMGR_STAT **, u_int32_t)); - * PUBLIC: #endif - */ -int -__repmgr_stat_pp(dbenv, statp, flags) - DB_ENV *dbenv; - DB_REPMGR_STAT **statp; - u_int32_t flags; -{ - COMPQUIET(statp, NULL); - COMPQUIET(flags, 0); - return (__db_norepmgr(dbenv)); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_stat_print_pp __P((DB_ENV *, u_int32_t)); - * PUBLIC: #endif - */ -int -__repmgr_stat_print_pp(dbenv, flags) - DB_ENV *dbenv; - u_int32_t flags; -{ - COMPQUIET(flags, 0); - return (__db_norepmgr(dbenv)); -} - -/* - * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); - * PUBLIC: #endif - */ -int -__repmgr_handle_event(dbenv, event, info) - DB_ENV *dbenv; - u_int32_t event; - void *info; -{ - COMPQUIET(dbenv, NULL); - COMPQUIET(event, 0); - COMPQUIET(info, NULL); - - /* - * It's not an error for this function to be called. Replication calls - * this to let repmgr handle events. If repmgr isn't part of the build, - * all replication events should be forwarded to the application. - */ - return (DB_EVENT_NOT_HANDLED); -} -#endif /* !HAVE_REPLICATION_THREADS */ diff --git a/db/repmgr/repmgr_util.c b/db/repmgr/repmgr_util.c deleted file mode 100644 index b6cbf3c9c..000000000 --- a/db/repmgr/repmgr_util.c +++ /dev/null @@ -1,426 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2005,2007 Oracle. All rights reserved. - * - * $Id: repmgr_util.c,v 1.34 2007/06/11 18:29:34 alanb Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#include "db_int.h" - -/* - * Schedules a future attempt to re-establish a connection with the given site. - * Usually, we wait the configured retry_wait period. But if the "immediate" - * parameter is given as TRUE, we'll make the wait time 0, and put the request - * at the _beginning_ of the retry queue. Note how this allows us to preserve - * the property that the queue stays in time order simply by appending to the - * end. - * - * PUBLIC: int __repmgr_schedule_connection_attempt __P((DB_ENV *, u_int, int)); - * - * !!! - * Caller should hold mutex. - * - * Unless an error occurs, we always attempt to wake the main thread; - * __repmgr_bust_connection relies on this behavior. - */ -int -__repmgr_schedule_connection_attempt(dbenv, eid, immediate) - DB_ENV *dbenv; - u_int eid; - int immediate; -{ - DB_REP *db_rep; - REPMGR_SITE *site; - REPMGR_RETRY *retry; - db_timespec t, v; - int ret; - - db_rep = dbenv->rep_handle; - if ((ret = __os_malloc(dbenv, sizeof(*retry), &retry)) != 0) - return (ret); - - __os_gettime(dbenv, &t); - if (immediate) - TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries); - else { - DB_TIMEOUT_TO_TIMESPEC(db_rep->connection_retry_wait, &v); - timespecadd(&t, &v); - TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries); - } - retry->eid = eid; - retry->time = t; - - site = SITE_FROM_EID(eid); - site->state = SITE_IDLE; - site->ref.retry = retry; - - return (__repmgr_wake_main_thread(dbenv)); -} - -/* - * Initialize the necessary control structures to begin reading a new input - * message. - * - * PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *)); - */ -void -__repmgr_reset_for_reading(con) - REPMGR_CONNECTION *con; -{ - con->reading_phase = SIZES_PHASE; - __repmgr_iovec_init(&con->iovecs); - __repmgr_add_buffer(&con->iovecs, &con->msg_type, - sizeof(con->msg_type)); - __repmgr_add_buffer(&con->iovecs, &con->control_size_buf, - sizeof(con->control_size_buf)); - __repmgr_add_buffer(&con->iovecs, &con->rec_size_buf, - sizeof(con->rec_size_buf)); -} - -/* - * Constructs a DB_REPMGR_CONNECTION structure, and puts it on the main list of - * connections. It does not initialize eid, since that isn't needed and/or - * immediately known in all cases. - * - * PUBLIC: int __repmgr_new_connection __P((DB_ENV *, REPMGR_CONNECTION **, - * PUBLIC: socket_t, u_int32_t)); - */ -int -__repmgr_new_connection(dbenv, connp, s, flags) - DB_ENV *dbenv; - REPMGR_CONNECTION **connp; - socket_t s; - u_int32_t flags; -{ - DB_REP *db_rep; - REPMGR_CONNECTION *c; - int ret; - - db_rep = dbenv->rep_handle; - if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0) - return (ret); - - c->fd = s; - c->flags = flags; - - STAILQ_INIT(&c->outbound_queue); - c->out_queue_length = 0; - - __repmgr_reset_for_reading(c); - TAILQ_INSERT_TAIL(&db_rep->connections, c, entries); - *connp = c; - - return (0); -} - -/* - * PUBLIC: int __repmgr_new_site __P((DB_ENV *, REPMGR_SITE**, - * PUBLIC: const repmgr_netaddr_t *, int)); - * - * !!! - * Caller must hold mutex. - */ -int -__repmgr_new_site(dbenv, sitep, addr, state) - DB_ENV *dbenv; - REPMGR_SITE **sitep; - const repmgr_netaddr_t *addr; - int state; -{ - DB_REP *db_rep; - REPMGR_SITE *site; - SITE_STRING_BUFFER buffer; - u_int new_site_max, eid; - int ret; - - db_rep = dbenv->rep_handle; - if (db_rep->site_cnt >= db_rep->site_max) { -#define INITIAL_SITES_ALLOCATION 10 /* Arbitrary guess. */ - new_site_max = db_rep->site_max == 0 ? - INITIAL_SITES_ALLOCATION : db_rep->site_max * 2; - if ((ret = __os_realloc(dbenv, - sizeof(REPMGR_SITE) * new_site_max, &db_rep->sites)) != 0) - return (ret); - db_rep->site_max = new_site_max; - } - eid = db_rep->site_cnt++; - - site = &db_rep->sites[eid]; - - memcpy(&site->net_addr, addr, sizeof(*addr)); - ZERO_LSN(site->max_ack); - site->priority = -1; /* OOB value indicates we don't yet know. */ - site->state = state; - - RPRINT(dbenv, (dbenv, "EID %u is assigned for %s", eid, - __repmgr_format_site_loc(site, buffer))); - *sitep = site; - return (0); -} - -/* - * Destructor for a repmgr_netaddr_t, cleans up any allocated memory pointed to - * by the addr. - * - * PUBLIC: void __repmgr_cleanup_netaddr __P((DB_ENV *, repmgr_netaddr_t *)); - */ -void -__repmgr_cleanup_netaddr(dbenv, addr) - DB_ENV *dbenv; - repmgr_netaddr_t *addr; -{ - if (addr->address_list != NULL) { - __db_freeaddrinfo(dbenv, addr->address_list); - addr->address_list = addr->current = NULL; - } - if (addr->host != NULL) { - __os_free(dbenv, addr->host); - addr->host = NULL; - } -} - -/* - * PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *)); - */ -void -__repmgr_iovec_init(v) - REPMGR_IOVECS *v; -{ - v->offset = v->count = 0; - v->total_bytes = 0; -} - -/* - * PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t)); - * - * !!! - * There is no checking for overflow of the vectors[5] array. - */ -void -__repmgr_add_buffer(v, address, length) - REPMGR_IOVECS *v; - void *address; - size_t length; -{ - v->vectors[v->count].iov_base = address; - v->vectors[v->count++].iov_len = length; - v->total_bytes += length; -} - -/* - * PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *)); - */ -void -__repmgr_add_dbt(v, dbt) - REPMGR_IOVECS *v; - const DBT *dbt; -{ - v->vectors[v->count].iov_base = dbt->data; - v->vectors[v->count++].iov_len = dbt->size; - v->total_bytes += dbt->size; -} - -/* - * Update a set of iovecs to reflect the number of bytes transferred in an I/O - * operation, so that the iovecs can be used to continue transferring where we - * left off. - * Returns TRUE if the set of buffers is now fully consumed, FALSE if more - * remains. - * - * PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t)); - */ -int -__repmgr_update_consumed(v, byte_count) - REPMGR_IOVECS *v; - size_t byte_count; -{ - db_iovec_t *iov; - int i; - - for (i = v->offset; ; i++) { - DB_ASSERT(NULL, i < v->count && byte_count > 0); - iov = &v->vectors[i]; - if (byte_count > iov->iov_len) { - /* - * We've consumed (more than) this vector's worth. - * Adjust count and continue. - */ - byte_count -= iov->iov_len; - } else { - /* Adjust length of remaining portion of vector. */ - iov->iov_len -= byte_count; - if (iov->iov_len > 0) { - /* - * Still some left in this vector. Adjust base - * address too, and leave offset pointing here. - */ - iov->iov_base = (void *) - ((u_int8_t *)iov->iov_base + byte_count); - v->offset = i; - } else { - /* - * Consumed exactly to a vector boundary. - * Advance to next vector for next time. - */ - v->offset = i+1; - } - /* - * If offset has reached count, the entire thing is - * consumed. - */ - return (v->offset >= v->count); - } - } -} - -/* - * Builds a buffer containing our network address information, suitable for - * publishing as cdata via a call to rep_start, and sets up the given DBT to - * point to it. The buffer is dynamically allocated memory, and the caller must - * assume responsibility for it. - * - * PUBLIC: int __repmgr_prepare_my_addr __P((DB_ENV *, DBT *)); - */ -int -__repmgr_prepare_my_addr(dbenv, dbt) - DB_ENV *dbenv; - DBT *dbt; -{ - DB_REP *db_rep; - size_t size, hlen; - u_int16_t port_buffer; - u_int8_t *ptr; - int ret; - - db_rep = dbenv->rep_handle; - - /* - * The cdata message consists of the 2-byte port number, in network byte - * order, followed by the null-terminated host name string. - */ - port_buffer = htons(db_rep->my_addr.port); - size = sizeof(port_buffer) + - (hlen = strlen(db_rep->my_addr.host) + 1); - if ((ret = __os_malloc(dbenv, size, &ptr)) != 0) - return (ret); - - DB_INIT_DBT(*dbt, ptr, size); - - memcpy(ptr, &port_buffer, sizeof(port_buffer)); - ptr = &ptr[sizeof(port_buffer)]; - memcpy(ptr, db_rep->my_addr.host, hlen); - - return (0); -} - -/* - * Provide the appropriate value for nsites, the number of sites in the - * replication group. If the application has specified a value, use that. - * Otherwise, just use the number of sites we know of. - * - * PUBLIC: u_int __repmgr_get_nsites __P((DB_REP *)); - */ -u_int -__repmgr_get_nsites(db_rep) - DB_REP *db_rep; -{ - if (db_rep->config_nsites > 0) - return ((u_int)db_rep->config_nsites); - - /* - * The number of other sites in our table, plus 1 to count ourself. - */ - return (db_rep->site_cnt + 1); -} - -/* - * PUBLIC: void __repmgr_thread_failure __P((DB_ENV *, int)); - */ -void -__repmgr_thread_failure(dbenv, why) - DB_ENV *dbenv; - int why; -{ - (void)__repmgr_stop_threads(dbenv); - (void)__db_panic(dbenv, why); -} - -/* - * Format a printable representation of a site location, suitable for inclusion - * in an error message. The buffer must be at least as big as - * MAX_SITE_LOC_STRING. - * - * PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *, int, char *)); - */ -char * -__repmgr_format_eid_loc(db_rep, eid, buffer) - DB_REP *db_rep; - int eid; - char *buffer; -{ - if (IS_VALID_EID(eid)) - return (__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer)); - - snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)"); - return (buffer); -} - -/* - * PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *)); - */ -char * -__repmgr_format_site_loc(site, buffer) - REPMGR_SITE *site; - char *buffer; -{ - snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu", - site->net_addr.host, (u_long)site->net_addr.port); - return (buffer); -} - -/* - * __repmgr_timespec_diff_now -- - * Calculate the time duration from now til "when". - * - * PUBLIC: void __repmgr_timespec_diff_now - * PUBLIC: __P((DB_ENV *, db_timespec *, db_timespec *)); - */ -void -__repmgr_timespec_diff_now(dbenv, when, result) - DB_ENV *dbenv; - db_timespec *when, *result; -{ - db_timespec now; - - __os_gettime(dbenv, &now); - if (timespeccmp(&now, when, >=)) - timespecclear(result); - else { - *result = *when; - timespecsub(result, &now); - } -} - -/* - * PUBLIC: int __repmgr_repstart __P((DB_ENV *, u_int32_t)); - */ -int -__repmgr_repstart(dbenv, flags) - DB_ENV *dbenv; - u_int32_t flags; -{ - DBT my_addr; - int ret; - - if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0) - return (ret); - ret = __rep_start(dbenv, &my_addr, flags); - __os_free(dbenv, my_addr.data); - if (ret != 0) - __db_err(dbenv, ret, "rep_start"); - return (ret); -} diff --git a/db/repmgr/repmgr_windows.c b/db/repmgr/repmgr_windows.c deleted file mode 100644 index c48f94e43..000000000 --- a/db/repmgr/repmgr_windows.c +++ /dev/null @@ -1,715 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2005,2007 Oracle. All rights reserved. - * - * $Id: repmgr_windows.c,v 1.22 2007/06/11 18:29:34 alanb Exp $ - */ - -#include "db_config.h" - -#define __INCLUDE_NETWORKING 1 -#include "db_int.h" - -typedef struct __ack_waiter { - HANDLE event; - const DB_LSN *lsnp; - struct __ack_waiter *next_free; -} ACK_WAITER; - -#define WAITER_SLOT_IN_USE(w) ((w)->lsnp != NULL) - -/* - * Array slots [0:next_avail-1] are initialized, and either in use or on the - * free list. Slots beyond that are virgin territory, whose memory contents - * could be garbage. In particular, note that slots [0:next_avail-1] have a - * Win32 Event Object created for them, which have to be freed when cleaning up - * this data structure. - * - * "first_free" points to a list of not-in-use slots threaded through the first - * section of the array. - */ -struct __ack_waiters_table { - struct __ack_waiter *array; - int size; - int next_avail; - struct __ack_waiter *first_free; -}; - -static int allocate_wait_slot __P((DB_ENV *, ACK_WAITER **)); -static void free_wait_slot __P((DB_ENV *, ACK_WAITER *)); -static int handle_completion __P((DB_ENV *, REPMGR_CONNECTION *)); -static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *, - LPWSANETWORKEVENTS)); - -int -__repmgr_thread_start(dbenv, runnable) - DB_ENV *dbenv; - REPMGR_RUNNABLE *runnable; -{ - HANDLE thread_id; - - runnable->finished = FALSE; - - thread_id = CreateThread(NULL, 0, - (LPTHREAD_START_ROUTINE)runnable->run, dbenv, 0, NULL); - if (thread_id == NULL) - return (GetLastError()); - runnable->thread_id = thread_id; - return (0); -} - -int -__repmgr_thread_join(thread) - REPMGR_RUNNABLE *thread; -{ - if (WaitForSingleObject(thread->thread_id, INFINITE) == WAIT_OBJECT_0) - return (0); - return (GetLastError()); -} - -int -__repmgr_set_nonblocking(s) - SOCKET s; -{ - int ret; - u_long arg; - - arg = 1; /* any non-zero value */ - if ((ret = ioctlsocket(s, FIONBIO, &arg)) == SOCKET_ERROR) - return (WSAGetLastError()); - return (0); -} - -/* - * Wake any send()-ing threads waiting for an acknowledgement. - * - * !!! - * Caller must hold the repmgr->mutex, if this thread synchronization is to work - * properly. - */ -int -__repmgr_wake_waiting_senders(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - ACK_WAITER *slot; - int i, ret; - - ret = 0; - db_rep = dbenv->rep_handle; - for (i=0; i<db_rep->waiters->next_avail; i++) { - slot = &db_rep->waiters->array[i]; - if (!WAITER_SLOT_IN_USE(slot)) - continue; - if (__repmgr_is_permanent(dbenv, slot->lsnp)) - if (!SetEvent(slot->event) && ret == 0) - ret = GetLastError(); - } - return (ret); -} - -/* - * !!! - * Caller must hold mutex. - */ -int -__repmgr_await_ack(dbenv, lsnp) - DB_ENV *dbenv; - const DB_LSN *lsnp; -{ - DB_REP *db_rep; - ACK_WAITER *me; - DWORD ret; - DWORD timeout; - - db_rep = dbenv->rep_handle; - - if ((ret = allocate_wait_slot(dbenv, &me)) != 0) - goto err; - - /* convert time-out from microseconds to milliseconds, rounding up */ - timeout = db_rep->ack_timeout > 0 ? - ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE; - me->lsnp = lsnp; - if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, - FALSE)) == WAIT_FAILED) { - ret = GetLastError(); - } else if (ret == WAIT_TIMEOUT) - ret = DB_REP_UNAVAIL; - else - DB_ASSERT(dbenv, ret == WAIT_OBJECT_0); - - LOCK_MUTEX(db_rep->mutex); - free_wait_slot(dbenv, me); - -err: - return (ret); -} - -/* - * !!! - * Caller must hold the mutex. - */ -static int -allocate_wait_slot(dbenv, resultp) - DB_ENV *dbenv; - ACK_WAITER **resultp; -{ - DB_REP *db_rep; - ACK_WAITERS_TABLE *table; - ACK_WAITER *w; - int ret; - - db_rep = dbenv->rep_handle; - table = db_rep->waiters; - if (table->first_free == NULL) { - if (table->next_avail >= table->size) { - /* - * Grow the array. - */ - table->size *= 2; - w = table->array; - if ((ret = __os_realloc(dbenv, table->size * sizeof(*w), - &w)) != 0) - return (ret); - table->array = w; - } - /* - * Here if, one way or another, we're good to go for using the - * next slot (for the first time). - */ - w = &table->array[table->next_avail++]; - if ((w->event = CreateEvent(NULL, FALSE, FALSE, NULL)) == - NULL) { - /* - * Maintain the sanctity of our rule that - * [0:next_avail-1] contain valid Event Objects. - */ - --table->next_avail; - return (GetLastError()); - } - } else { - w = table->first_free; - table->first_free = w->next_free; - } - *resultp = w; - return (0); -} - -static void -free_wait_slot(dbenv, slot) - DB_ENV *dbenv; - ACK_WAITER *slot; -{ - DB_REP *db_rep; - - db_rep = dbenv->rep_handle; - - slot->lsnp = NULL; /* show it's not in use */ - slot->next_free = db_rep->waiters->first_free; - db_rep->waiters->first_free = slot; -} - -/* - * Make resource allocation an all-or-nothing affair, outside of this and the - * close_sync function. db_rep->waiters should be non-NULL iff all of these - * resources have been created. - */ -int -__repmgr_init_sync(dbenv, db_rep) - DB_ENV *dbenv; - DB_REP *db_rep; -{ -#define INITIAL_ALLOCATION 5 /* arbitrary size */ - ACK_WAITERS_TABLE *table; - int ret; - - db_rep->signaler = db_rep->queue_nonempty = db_rep->check_election = - db_rep->mutex = NULL; - table = NULL; - - if ((db_rep->signaler = CreateEvent(NULL, /* security attr */ - FALSE, /* (not) of the manual reset variety */ - FALSE, /* (not) initially signaled */ - NULL)) == NULL) /* name */ - goto geterr; - - if ((db_rep->queue_nonempty = CreateEvent(NULL, TRUE, FALSE, NULL)) - == NULL) - goto geterr; - - if ((db_rep->check_election = CreateEvent(NULL, FALSE, FALSE, NULL)) - == NULL) - goto geterr; - - if ((db_rep->mutex = CreateMutex(NULL, FALSE, NULL)) == NULL) - goto geterr; - - if ((ret = __os_calloc(dbenv, 1, sizeof(ACK_WAITERS_TABLE), &table)) - != 0) - goto err; - - if ((ret = __os_calloc(dbenv, INITIAL_ALLOCATION, sizeof(ACK_WAITER), - &table->array)) != 0) - goto err; - - table->size = INITIAL_ALLOCATION; - table->first_free = NULL; - table->next_avail = 0; - - /* There's a restaurant joke in there somewhere. */ - db_rep->waiters = table; - return (0); - -geterr: - ret = GetLastError(); -err: - if (db_rep->check_election != NULL) - CloseHandle(db_rep->check_election); - if (db_rep->queue_nonempty != NULL) - CloseHandle(db_rep->queue_nonempty); - if (db_rep->signaler != NULL) - CloseHandle(db_rep->signaler); - if (db_rep->mutex != NULL) - CloseHandle(db_rep->mutex); - if (table != NULL) - __os_free(dbenv, table); - db_rep->waiters = NULL; - return (ret); -} - -int -__repmgr_close_sync(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - int i, ret; - - db_rep = dbenv->rep_handle; - if (!(REPMGR_SYNC_INITED(db_rep))) - return (0); - - ret = 0; - for (i = 0; i < db_rep->waiters->next_avail; i++) { - if (!CloseHandle(db_rep->waiters->array[i].event) && ret == 0) - ret = GetLastError(); - } - __os_free(dbenv, db_rep->waiters->array); - __os_free(dbenv, db_rep->waiters); - - if (!CloseHandle(db_rep->check_election) && ret == 0) - ret = GetLastError(); - - if (!CloseHandle(db_rep->queue_nonempty) && ret == 0) - ret = GetLastError(); - - if (!CloseHandle(db_rep->signaler) && ret == 0) - ret = GetLastError(); - - if (!CloseHandle(db_rep->mutex) && ret == 0) - ret = GetLastError(); - - db_rep->waiters = NULL; - return (ret); -} - -/* - * Performs net-related resource initialization other than memory initialization - * and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing" - * sentinel signifying that these resources are allocated (except that now the - * new wsa_inited flag may be used to indicate that WSAStartup has already been - * called). - */ -int -__repmgr_net_init(dbenv, db_rep) - DB_ENV *dbenv; - DB_REP *db_rep; -{ - int ret; - - /* Initialize the Windows sockets DLL. */ - if (!db_rep->wsa_inited && (ret = __repmgr_wsa_init(dbenv)) != 0) - goto err; - - if ((ret = __repmgr_listen(dbenv)) == 0) - return (0); - - if (WSACleanup() == SOCKET_ERROR) { - ret = net_errno; - __db_err(dbenv, ret, "WSACleanup"); - } - -err: db_rep->listen_fd = INVALID_SOCKET; - return (ret); -} - -/* - * __repmgr_wsa_init -- - * Initialize the Windows sockets DLL. - * - * PUBLIC: int __repmgr_wsa_init __P((DB_ENV *)); - */ -int -__repmgr_wsa_init(dbenv) - DB_ENV *dbenv; -{ - DB_REP *db_rep; - WSADATA wsaData; - int ret; - - db_rep = dbenv->rep_handle; - - if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) { - __db_err(dbenv, ret, "unable to initialize Windows networking"); - return (ret); - } - db_rep->wsa_inited = TRUE; - - return (0); -} - -int -__repmgr_lock_mutex(mutex) - mgr_mutex_t *mutex; -{ - if (WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0) - return (0); - return (GetLastError()); -} - -int -__repmgr_unlock_mutex(mutex) - mgr_mutex_t *mutex; -{ - if (ReleaseMutex(*mutex)) - return (0); - return (GetLastError()); -} - -int -__repmgr_signal(v) - cond_var_t *v; -{ - return (SetEvent(*v) ? 0 : GetLastError()); -} - -int -__repmgr_wake_main_thread(dbenv) - DB_ENV *dbenv; -{ - if (!SetEvent(dbenv->rep_handle->signaler)) - return (GetLastError()); - return (0); -} - -int -__repmgr_writev(fd, iovec, buf_count, byte_count_p) - socket_t fd; - db_iovec_t *iovec; - int buf_count; - size_t *byte_count_p; -{ - DWORD bytes; - - if (WSASend(fd, iovec, - (DWORD)buf_count, &bytes, 0, NULL, NULL) == SOCKET_ERROR) - return (net_errno); - - *byte_count_p = (size_t)bytes; - return (0); -} - -int -__repmgr_readv(fd, iovec, buf_count, xfr_count_p) - socket_t fd; - db_iovec_t *iovec; - int buf_count; - size_t *xfr_count_p; -{ - DWORD bytes, flags; - - flags = 0; - if (WSARecv(fd, iovec, - (DWORD)buf_count, &bytes, &flags, NULL, NULL) == SOCKET_ERROR) - return (net_errno); - - *xfr_count_p = (size_t)bytes; - return (0); -} - -int -__repmgr_select_loop(dbenv) - DB_ENV *dbenv; -{ - DWORD select_timeout; - DB_REP *db_rep; - REPMGR_CONNECTION *conn, *next; - REPMGR_RETRY *retry; - WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS]; - REPMGR_CONNECTION *connections[WSA_MAXIMUM_WAIT_EVENTS]; - DWORD nevents, ret; - db_timespec timeout; - WSAEVENT listen_event; - WSANETWORKEVENTS net_events; - int flow_control, i; - - db_rep = dbenv->rep_handle; - - if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) { - __db_err( - dbenv, net_errno, "can't create event for listen socket"); - return (net_errno); - } - if (WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) == - SOCKET_ERROR) { - ret = net_errno; - __db_err(dbenv, ret, "can't enable event for listener"); - goto out; - } - - LOCK_MUTEX(db_rep->mutex); - if ((ret = __repmgr_first_try_connections(dbenv)) != 0) - goto unlock; - flow_control = FALSE; - for (;;) { - /* Start with the two events that we always wait for. */ - events[0] = db_rep->signaler; - events[1] = listen_event; - nevents = 2; - - /* - * Add an event for each surviving socket that we're interested - * in. (For now [until we implement flow control], that's all - * of them, in one form or another.) - * Note that even if we're suffering flow control, we - * nevertheless still read if we haven't even yet gotten a - * handshake. Why? (1) Handshakes are important; and (2) they - * don't hurt anything flow-control-wise. - */ - TAILQ_FOREACH(conn, &db_rep->connections, entries) { - if (F_ISSET(conn, CONN_CONNECTING) || - !STAILQ_EMPTY(&conn->outbound_queue) || - (!flow_control || !IS_VALID_EID(conn->eid))) { - events[nevents] = conn->event_object; - connections[nevents++] = conn; - } - } - - /* - * Decide how long to wait based on when it will next be time to - * retry an idle connection. (List items are in order, so we - * only have to examine the first one.) - */ - if (TAILQ_EMPTY(&db_rep->retries)) - select_timeout = WSA_INFINITE; - else { - retry = TAILQ_FIRST(&db_rep->retries); - - __repmgr_timespec_diff_now( - dbenv, &retry->time, &timeout); - select_timeout = - (DWORD)(timeout.tv_sec * MS_PER_SEC + - timeout.tv_nsec / NS_PER_MS); - } - - UNLOCK_MUTEX(db_rep->mutex); - ret = WSAWaitForMultipleEvents( - nevents, events, FALSE, select_timeout, FALSE); - if (db_rep->finished) { - ret = 0; - goto out; - } - LOCK_MUTEX(db_rep->mutex); - - /* - * The first priority thing we must do is to clean up any - * pending defunct connections. Otherwise, if they have any - * lingering pending input, we get very confused if we try to - * process it. - * Loop just like TAILQ_FOREACH, except that we need to be - * able to unlink a list entry. - */ - for (conn = TAILQ_FIRST(&db_rep->connections); - conn != NULL; - conn = next) { - next = TAILQ_NEXT(conn, entries); - if (F_ISSET(conn, CONN_DEFUNCT)) - __repmgr_cleanup_connection(dbenv, conn); - } - - /* - * !!! - * Note that `ret' remains set as the return code from - * WSAWaitForMultipleEvents, above. - */ - if (ret >= WSA_WAIT_EVENT_0 && - ret < WSA_WAIT_EVENT_0 + nevents) { - switch (i = ret - WSA_WAIT_EVENT_0) { - case 0: - /* Another thread woke us. */ - break; - case 1: - if ((ret = WSAEnumNetworkEvents( - db_rep->listen_fd, listen_event, - &net_events)) == SOCKET_ERROR) { - ret = net_errno; - goto unlock; - } - DB_ASSERT(dbenv, - net_events.lNetworkEvents & FD_ACCEPT); - if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT]) - != 0) - goto unlock; - if ((ret = __repmgr_accept(dbenv)) != 0) - goto unlock; - break; - default: - if ((ret = handle_completion(dbenv, - connections[i])) != 0) - goto unlock; - break; - } - } else if (ret == WSA_WAIT_TIMEOUT) { - if ((ret = __repmgr_retry_connections(dbenv)) != 0) - goto unlock; - } else if (ret == WSA_WAIT_FAILED) { - ret = net_errno; - goto unlock; - } - } - -unlock: - UNLOCK_MUTEX(db_rep->mutex); -out: - if (!CloseHandle(listen_event) && ret == 0) - ret = GetLastError(); - return (ret); -} - -/* - * !!! - * Only ever called on the select() thread, since we may call - * __repmgr_bust_connection(..., TRUE). - */ -static int -handle_completion(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -{ - int ret; - WSANETWORKEVENTS events; - - if ((ret = WSAEnumNetworkEvents(conn->fd, conn->event_object, &events)) - == SOCKET_ERROR) { - __db_err(dbenv, net_errno, "EnumNetworkEvents"); - STAT(dbenv->rep_handle->region->mstat.st_connection_drop++); - ret = DB_REP_UNAVAIL; - goto err; - } - - if (F_ISSET(conn, CONN_CONNECTING)) { - if ((ret = finish_connecting(dbenv, conn, &events)) != 0) - goto err; - } else { /* Check both writing and reading. */ - if (events.lNetworkEvents & FD_CLOSE) { - __db_err(dbenv, - events.iErrorCode[FD_CLOSE_BIT], - "connection closed"); - STAT(dbenv->rep_handle-> - region->mstat.st_connection_drop++); - ret = DB_REP_UNAVAIL; - goto err; - } - - if (events.lNetworkEvents & FD_WRITE) { - if (events.iErrorCode[FD_WRITE_BIT] != 0) { - __db_err(dbenv, - events.iErrorCode[FD_WRITE_BIT], - "error writing"); - STAT(dbenv->rep_handle-> - region->mstat.st_connection_drop++); - ret = DB_REP_UNAVAIL; - goto err; - } else if ((ret = - __repmgr_write_some(dbenv, conn)) != 0) - goto err; - } - - if (events.lNetworkEvents & FD_READ) { - if (events.iErrorCode[FD_READ_BIT] != 0) { - __db_err(dbenv, - events.iErrorCode[FD_READ_BIT], - "error reading"); - STAT(dbenv->rep_handle-> - region->mstat.st_connection_drop++); - ret = DB_REP_UNAVAIL; - goto err; - } else if ((ret = - __repmgr_read_from_site(dbenv, conn)) != 0) - goto err; - } - } - - return (0); - -err: if (ret == DB_REP_UNAVAIL) - return (__repmgr_bust_connection(dbenv, conn, TRUE)); - return (ret); -} - -static int -finish_connecting(dbenv, conn, events) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - LPWSANETWORKEVENTS events; -{ - DB_REP *db_rep; - u_int eid; -/* char reason[100]; */ - int ret/*, t_ret*/; -/* DWORD_PTR values[1]; */ - - if (!(events->lNetworkEvents & FD_CONNECT)) - return (0); - - F_CLR(conn, CONN_CONNECTING); - - if ((ret = events->iErrorCode[FD_CONNECT_BIT]) != 0) { -/* t_ret = FormatMessage( */ -/* FORMAT_MESSAGE_IGNORE_INSERTS | */ -/* FORMAT_MESSAGE_FROM_SYSTEM | */ -/* FORMAT_MESSAGE_ARGUMENT_ARRAY, */ -/* NULL, ret, 0, (LPTSTR)reason, sizeof(reason), values); */ -/* __db_err(dbenv/\*, ret*\/, "connecting: %s", */ -/* reason); */ -/* LocalFree(reason); */ - __db_err(dbenv, ret, "connecting"); - goto err; - } - - if (WSAEventSelect(conn->fd, conn->event_object, FD_READ | FD_CLOSE) == - SOCKET_ERROR) { - ret = net_errno; - __db_err(dbenv, ret, "setting event bits for reading"); - return (ret); - } - - return (__repmgr_send_handshake(dbenv, conn)); - -err: - db_rep = dbenv->rep_handle; - eid = conn->eid; - DB_ASSERT(dbenv, IS_VALID_EID(eid)); - - if (ADDR_LIST_NEXT(&SITE_FROM_EID(eid)->net_addr) == NULL) { - STAT(db_rep->region->mstat.st_connect_fail++); - return (DB_REP_UNAVAIL); - } - - DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); - __repmgr_cleanup_connection(dbenv, conn); - ret = __repmgr_connect_site(dbenv, eid); - DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); - return (ret); -} |