diff options
author | Panu Matilainen <pmatilai@redhat.com> | 2007-07-16 16:48:14 +0300 |
---|---|---|
committer | Panu Matilainen <pmatilai@redhat.com> | 2007-07-16 16:48:14 +0300 |
commit | 2cfd3012bfcb5c5c61bbaf662ef084e0ab789d79 (patch) | |
tree | e12ee52087506ac8c7a5eee83b17497d98df2d40 /db/repmgr | |
parent | b754fe19fd387ca5fe8e7c00ddaa25c898fa192f (diff) | |
download | librpm-tizen-2cfd3012bfcb5c5c61bbaf662ef084e0ab789d79.tar.gz librpm-tizen-2cfd3012bfcb5c5c61bbaf662ef084e0ab789d79.tar.bz2 librpm-tizen-2cfd3012bfcb5c5c61bbaf662ef084e0ab789d79.zip |
Update internal BDB to version 4.5.20
Diffstat (limited to 'db/repmgr')
-rw-r--r-- | db/repmgr/repmgr_elect.c | 369 | ||||
-rw-r--r-- | db/repmgr/repmgr_method.c | 442 | ||||
-rw-r--r-- | db/repmgr/repmgr_msg.c | 333 | ||||
-rw-r--r-- | db/repmgr/repmgr_net.c | 1041 | ||||
-rw-r--r-- | db/repmgr/repmgr_posix.c | 714 | ||||
-rw-r--r-- | db/repmgr/repmgr_queue.c | 158 | ||||
-rw-r--r-- | db/repmgr/repmgr_sel.c | 875 | ||||
-rw-r--r-- | db/repmgr/repmgr_stat.c | 116 | ||||
-rw-r--r-- | db/repmgr/repmgr_util.c | 415 | ||||
-rw-r--r-- | db/repmgr/repmgr_windows.c | 708 |
10 files changed, 5171 insertions, 0 deletions
diff --git a/db/repmgr/repmgr_elect.c b/db/repmgr/repmgr_elect.c new file mode 100644 index 000000000..39ed1e863 --- /dev/null +++ b/db/repmgr/repmgr_elect.c @@ -0,0 +1,369 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2005-2006 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_elect.c,v 1.23 2006/09/12 01:06:34 alanb Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#include "db_int.h" + +static int __repmgr_is_ready __P((DB_ENV *)); +static int __repmgr_elect_main __P((DB_ENV *)); +static void *__repmgr_elect_thread __P((void *)); +static int start_election_thread __P((DB_ENV *)); + +/* + * Starts the election thread, or wakes up an existing one, starting off with + * the specified operation (an election, or a call to rep_start(CLIENT), or + * nothing). Avoid multiple concurrent elections. + * + * PUBLIC: int __repmgr_init_election __P((DB_ENV *, int)); + * + * !!! + * Caller must hold mutex. + */ +int +__repmgr_init_election(dbenv, initial_operation) + DB_ENV *dbenv; + int initial_operation; +{ + DB_REP *db_rep; + int ret; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + db_rep = dbenv->rep_handle; + + if (db_rep->finished) { + RPRINT(dbenv, (dbenv, &mb, + "ignoring elect thread request %d; repmgr is finished", + initial_operation)); + return (0); + } + + db_rep->operation_needed = initial_operation; + if (db_rep->elect_thread == NULL) + ret = start_election_thread(dbenv); + else if (db_rep->elect_thread->finished) { + RPRINT(dbenv, (dbenv, &mb, "join dead elect thread")); + if ((ret = __repmgr_thread_join(db_rep->elect_thread)) != 0) + return (ret); + __os_free(dbenv, db_rep->elect_thread); + db_rep->elect_thread = NULL; + ret = start_election_thread(dbenv); + } else { + RPRINT(dbenv, (dbenv, &mb, "reusing existing elect thread")); + if ((ret = __repmgr_signal(&db_rep->check_election)) != 0) + __db_err(dbenv, ret, "can't signal election thread"); + } + return (ret); +} + +/* + * !!! + * Caller holds mutex. + */ +static int +start_election_thread(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + REPMGR_RUNNABLE *elector; + int ret; + + db_rep = dbenv->rep_handle; + + if ((ret = __os_malloc(dbenv, sizeof(REPMGR_RUNNABLE), &elector)) + != 0) + return (ret); + elector->dbenv = dbenv; + elector->run = __repmgr_elect_thread; + + if ((ret = __repmgr_thread_start(dbenv, elector)) == 0) + db_rep->elect_thread = elector; + else + __os_free(dbenv, elector); + + return (ret); +} + +static void * +__repmgr_elect_thread(args) + void *args; +{ + DB_ENV *dbenv = args; + int ret; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + RPRINT(dbenv, (dbenv, &mb, "starting election thread")); + + if ((ret = __repmgr_elect_main(dbenv)) != 0) { + __db_err(dbenv, ret, "election thread failed"); + __repmgr_thread_failure(dbenv, ret); + } + + RPRINT(dbenv, (dbenv, &mb, "election thread is exiting")); + return (NULL); +} + +static int +__repmgr_elect_main(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + DBT my_addr; +#ifdef DB_WIN32 + DWORD duration; +#else + struct timespec deadline; +#endif + u_int nsites, nvotes; + int chosen_master, done, failure_recovery, last_op, ret, to_do; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + db_rep = dbenv->rep_handle; + last_op = 0; + + /* + * db_rep->operation_needed is the mechanism by which the outside world + * (running in a different thread) tells us what it wants us to do. It + * is obviously relevant when we're just starting up. But it can also + * be set if a subsequent request for us to do something occurs while + * we're still looping. + * + * ELECT_FAILURE_ELECTION asks us to start by doing an election, but to + * do so in failure recovery mode. This failure recovery mode may + * persist through several loop iterations: as long as it takes us to + * succeed in finding a master, or until we get asked to perform a new + * request. Thus the time for mapping ELECT_FAILURE_ELECTION to the + * internal ELECT_ELECTION, as well as the setting of the failure + * recovery flag, is at the point we receive the new request from + * operation_needed (either here, or within the loop below). + */ + LOCK_MUTEX(db_rep->mutex); + if (db_rep->finished) { + db_rep->elect_thread->finished = TRUE; + UNLOCK_MUTEX(db_rep->mutex); + return (0); + } + to_do = db_rep->operation_needed; + db_rep->operation_needed = 0; + UNLOCK_MUTEX(db_rep->mutex); + if (to_do == ELECT_FAILURE_ELECTION) { + failure_recovery = TRUE; + to_do = ELECT_ELECTION; + } else + failure_recovery = FALSE; + + for (;;) { + RPRINT(dbenv, (dbenv, &mb, "elect thread to do: %d", to_do)); + switch (to_do) { + case ELECT_ELECTION: + nsites = __repmgr_get_nsites(db_rep); + + if (db_rep->init_policy == DB_REP_FULL_ELECTION && + !db_rep->found_master) + nvotes = nsites; + else { + nvotes = ELECTION_MAJORITY(nsites); + + /* + * If we're doing an election because we noticed + * that the master failed, it's reasonable to + * expect that the master won't participate. By + * not waiting for its vote, we can probably + * complete the election faster. But note that + * we shouldn't allow this to affect nvotes + * calculation. + */ + if (failure_recovery) { + nsites--; + + if (nsites == 1) { + /* + * We've just lost the only + * other site in the group, so + * there's no point in holding + * an election. + */ + if ((ret = + __repmgr_become_master( + dbenv)) != 0) + return (ret); + break; + } + } + } + + switch (ret = __rep_elect(dbenv, + (int)nsites, (int)nvotes, &chosen_master, 0)) { + case DB_REP_UNAVAIL: + break; + + case 0: + if (chosen_master == SELF_EID && + (ret = __repmgr_become_master(dbenv)) != 0) + return (ret); + break; + + default: + __db_err( + dbenv, ret, "unexpected election failure"); + return (ret); + } + last_op = ELECT_ELECTION; + break; + case ELECT_REPSTART: + if ((ret = + __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0) + return (ret); + ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT); + __os_free(dbenv, my_addr.data); + if (ret != 0) { + __db_err(dbenv, ret, "rep_start"); + return (ret); + } + last_op = ELECT_REPSTART; + break; + case 0: + /* + * Nothing to do: this can happen the first time + * through, on initialization. + */ + last_op = 0; + break; + default: + DB_ASSERT(dbenv, FALSE); + } + + LOCK_MUTEX(db_rep->mutex); + while (!__repmgr_is_ready(dbenv)) { +#ifdef DB_WIN32 + duration = db_rep->election_retry_wait / 1000; + ret = SignalObjectAndWait(db_rep->mutex, + db_rep->check_election, duration, FALSE); + LOCK_MUTEX(db_rep->mutex); + if (ret == WAIT_TIMEOUT) + break; + DB_ASSERT(dbenv, ret == WAIT_OBJECT_0); +#else + __repmgr_compute_wait_deadline(dbenv, &deadline, + db_rep->election_retry_wait); + if ((ret = pthread_cond_timedwait( + &db_rep->check_election, &db_rep->mutex, &deadline)) + == ETIMEDOUT) + break; + DB_ASSERT(dbenv, ret == 0); +#endif + } + + /* + * Ways we can get here: time out, operation needed, master + * becomes valid, or thread shut-down command. + * + * If we're not yet done, figure out what to do next: if we've + * been told explicitly what to do (operation_needed), do that. + * Otherwise, what we do next is approximately the complement of + * what we just did; in other words, we alternate. + */ + done = IS_VALID_EID(db_rep->master_eid) || db_rep->finished; + if (done) + db_rep->elect_thread->finished = TRUE; + else if ((to_do = db_rep->operation_needed) == 0) { + if (last_op == ELECT_ELECTION) + to_do = ELECT_REPSTART; + else { + /* + * Generally, if what we previously did is a + * rep_start (or nothing, which really just + * means another thread did the rep_start before + * turning us on), then we next do an election. + * However, with the REP_CLIENT init policy we + * never do an initial election. + */ + to_do = ELECT_ELECTION; + if (db_rep->init_policy == DB_REP_CLIENT && + !db_rep->found_master) + to_do = ELECT_REPSTART; + } + } else { + db_rep->operation_needed = 0; + if (to_do == ELECT_FAILURE_ELECTION) { + failure_recovery = TRUE; + to_do = ELECT_ELECTION; + } else + failure_recovery = FALSE; + } + + /* + * TODO: is it possible for an operation_needed to be set, with + * nevertheless a valid master? I don't think so. Would a more + * straightforward exit test involve "operation_needed" instead + * of (or in addition to) valid master? + */ + + UNLOCK_MUTEX(db_rep->mutex); + if (done) + return (0); + } +} + +/* + * Tests whether the election thread is ready to do something, or if it should + * wait a little while. + */ +static int +__repmgr_is_ready(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + db_rep = dbenv->rep_handle; + + RPRINT(dbenv, (dbenv, &mb, + "repmgr elect: opcode %d, finished %d, master %d", + db_rep->operation_needed, db_rep->finished, db_rep->master_eid)); + + return (db_rep->operation_needed || + db_rep->finished || IS_VALID_EID(db_rep->master_eid)); +} + +/* + * PUBLIC: int __repmgr_become_master __P((DB_ENV *)); + */ +int +__repmgr_become_master(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + DBT my_addr; + int ret; + + db_rep = dbenv->rep_handle; + db_rep->master_eid = SELF_EID; + db_rep->found_master = TRUE; + + if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0) + return (ret); + ret = __rep_start(dbenv, &my_addr, DB_REP_MASTER); + __os_free(dbenv, my_addr.data); + if (ret != 0) + return (ret); + if ((ret = __repmgr_stash_generation(dbenv)) != 0) + return (ret); + + return (0); +} diff --git a/db/repmgr/repmgr_method.c b/db/repmgr/repmgr_method.c new file mode 100644 index 000000000..d2fdb8eb9 --- /dev/null +++ b/db/repmgr/repmgr_method.c @@ -0,0 +1,442 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2005-2006 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_method.c,v 1.28 2006/09/11 15:15:20 bostic Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#include "db_int.h" + +static int __repmgr_await_threads __P((DB_ENV *)); + +/* + * TODO: should (more of) this function be protected by mutex? Caution: calling + * rep_start while holding mutex doesn't work, 'cuz it pushes out a message to + * the send() function. + */ +/* + * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t)); + */ +int +__repmgr_start(dbenv, nthreads, flags) + DB_ENV *dbenv; + int nthreads; + u_int32_t flags; +{ + DB_REP *db_rep; + DBT my_addr; + REPMGR_RUNNABLE *selector, *messenger; + int ret, i; + + db_rep = dbenv->rep_handle; + + /* Check that the required initialization has been done. */ + if (db_rep->my_addr.port == 0) { + __db_errx(dbenv, + "repmgr_set_local_site must be called before repmgr_start"); + return (EINVAL); + } + + if (db_rep->selector != NULL || db_rep->finished) { + __db_errx(dbenv, + "DB_ENV->repmgr_start may not be called more than once"); + return (EINVAL); + } + + switch (flags) { + case DB_REP_CLIENT: + case DB_REP_ELECTION: + case DB_REP_FULL_ELECTION: + case DB_REP_MASTER: + break; + default: + __db_errx(dbenv, + "repmgr_start: unrecognized flags parameter value"); + return (EINVAL); + } + + if (nthreads <= 0) { + __db_errx(dbenv, + "repmgr_start: nthreads parameter must be >= 1"); + return (EINVAL); + } + + if ((ret = + __os_calloc(dbenv, (u_int)nthreads, sizeof(REPMGR_RUNNABLE *), + &db_rep->messengers)) != 0) + return (ret); + db_rep->nthreads = nthreads; + + if ((ret = __repmgr_net_init(dbenv, db_rep)) != 0 || + (ret = __repmgr_init_sync(dbenv, db_rep)) != 0 || + (ret = __rep_set_transport(dbenv, SELF_EID, __repmgr_send)) != 0) + return (ret); + + /* + * Make some sort of call to rep_start before starting other threads, to + * ensure that incoming messages being processed always have a rep + * context properly configured. + */ + if ((db_rep->init_policy = flags) == DB_REP_MASTER) + ret = __repmgr_become_master(dbenv); + else { + if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0) + return (ret); + ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT); + __os_free(dbenv, my_addr.data); + if (ret == 0) { + LOCK_MUTEX(db_rep->mutex); + ret = __repmgr_init_election(dbenv, 0); + UNLOCK_MUTEX(db_rep->mutex); + } + } + if (ret != 0) + return (ret); + + if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_RUNNABLE), &selector)) + != 0) + return (ret); + selector->dbenv = dbenv; + selector->run = __repmgr_select_thread; + if ((ret = __repmgr_thread_start(dbenv, selector)) != 0) { + __db_err(dbenv, ret, "can't start selector thread"); + __os_free(dbenv, selector); + return (ret); + } else + db_rep->selector = selector; + + for (i=0; i<nthreads; i++) { + if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_RUNNABLE), + &messenger)) != 0) + return (ret); + + messenger->dbenv = dbenv; + messenger->run = __repmgr_msg_thread; + if ((ret = __repmgr_thread_start(dbenv, messenger)) != 0) { + __os_free(dbenv, messenger); + return (ret); + } + db_rep->messengers[i] = messenger; + } + + return (ret); +} + +/* + * PUBLIC: int __repmgr_close __P((DB_ENV *)); + */ +int +__repmgr_close(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + int ret, t_ret; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + ret = 0; + db_rep = dbenv->rep_handle; + if (db_rep->selector != NULL) { + RPRINT(dbenv, (dbenv, &mb, "Stopping repmgr threads")); + ret = __repmgr_stop_threads(dbenv); + if ((t_ret = __repmgr_await_threads(dbenv)) != 0 && ret == 0) + ret = t_ret; + RPRINT(dbenv, (dbenv, &mb, "Repmgr threads are finished")); + } + + if ((t_ret = __repmgr_net_close(dbenv)) != 0 && ret == 0) + ret = t_ret; + + if ((t_ret = __repmgr_close_sync(dbenv)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +/* + * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int)); + */ +int +__repmgr_set_ack_policy(dbenv, policy) + DB_ENV *dbenv; + int policy; +{ + switch (policy) { + case DB_REPMGR_ACKS_ALL: /* FALLTHROUGH */ + case DB_REPMGR_ACKS_ALL_PEERS: /* FALLTHROUGH */ + case DB_REPMGR_ACKS_NONE: /* FALLTHROUGH */ + case DB_REPMGR_ACKS_ONE: /* FALLTHROUGH */ + case DB_REPMGR_ACKS_ONE_PEER: /* FALLTHROUGH */ + case DB_REPMGR_ACKS_QUORUM: + dbenv->rep_handle->perm_policy = policy; + return (0); + default: + __db_errx(dbenv, + "Unknown ack_policy in DB_ENV->repmgr_set_ack_policy"); + return (EINVAL); + } +} + +/* + * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *)); + */ +int +__repmgr_get_ack_policy(dbenv, policy) + DB_ENV *dbenv; + int *policy; +{ + *policy = dbenv->rep_handle->perm_policy; + return (0); +} + +/* + * PUBLIC: int __repmgr_dbenv_create __P((DB_ENV *, DB_REP *)); + */ +int +__repmgr_dbenv_create(dbenv, db_rep) + DB_ENV *dbenv; + DB_REP *db_rep; +{ + int ret; + + /* Set some default values. */ + db_rep->elect_timeout = 2 * 1000000; /* 2 seconds */ + db_rep->ack_timeout = 1 * 1000000; /* 1 second */ + db_rep->connection_retry_wait = 30 * 1000000; /* 30 seconds */ + db_rep->election_retry_wait = 10 * 1000000; /* 10 seconds */ + db_rep->config_nsites = 0; + db_rep->peer = DB_EID_INVALID; + db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; + db_rep->my_priority = 100; + + /* + * TODO: OK, this has just crossed my pain tolerance threshold: I think + * this is just getting too unjustifiably complex. It's probably just + * to have an explicit flag of some sort indicating that initialization + * has been done, and maybe not even bother with the fine granularity of + * initializing net, sync and queue separately (at least in terms of + * letting them succeed or fail independently). + */ +#ifdef DB_WIN32 + db_rep->waiters = NULL; +#else + db_rep->read_pipe = db_rep->write_pipe = -1; +#endif + if ((ret = __repmgr_net_create(dbenv, db_rep)) == 0) + ret = __repmgr_queue_create(dbenv, db_rep); + + return (ret); +} + +/* + * PUBLIC: void __repmgr_dbenv_destroy __P((DB_ENV *, DB_REP *)); + */ +void +__repmgr_dbenv_destroy(dbenv, db_rep) + DB_ENV *dbenv; + DB_REP *db_rep; +{ + __repmgr_queue_destroy(dbenv); + __repmgr_net_destroy(dbenv, db_rep); + if (db_rep->messengers != NULL) { + __os_free(dbenv, db_rep->messengers); + db_rep->messengers = NULL; + } +} + +/* + * PUBLIC: int __repmgr_stop_threads __P((DB_ENV *)); + */ +int +__repmgr_stop_threads(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + int ret; + + db_rep = dbenv->rep_handle; + + /* + * Hold mutex for the purpose of waking up threads, but then get out of + * the way to let them clean up and exit. + */ + LOCK_MUTEX(db_rep->mutex); + db_rep->finished = TRUE; + if (db_rep->elect_thread != NULL && + (ret = __repmgr_signal(&db_rep->check_election)) != 0) + goto unlock; + + if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0) + goto unlock; + UNLOCK_MUTEX(db_rep->mutex); + + return (__repmgr_wake_main_thread(dbenv)); + +unlock: + UNLOCK_MUTEX(db_rep->mutex); + return (ret); +} + +static int +__repmgr_await_threads(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + REPMGR_RUNNABLE *messenger; + int ret, t_ret, i; + + db_rep = dbenv->rep_handle; + ret = 0; + if (db_rep->elect_thread != NULL) { + ret = __repmgr_thread_join(db_rep->elect_thread); + __os_free(dbenv, db_rep->elect_thread); + db_rep->elect_thread = NULL; + } + + /* TODO: if the join fails, how/when do we clean up the memory? */ + for (i=0; i<db_rep->nthreads && db_rep->messengers[i] != NULL; i++) { + messenger = db_rep->messengers[i]; + if ((t_ret = __repmgr_thread_join(messenger)) != 0 && ret == 0) + ret = t_ret; + __os_free(dbenv, messenger); + db_rep->messengers[i] = NULL; /* necessary? */ + } + __os_free(dbenv, db_rep->messengers); + db_rep->messengers = NULL; + + if (db_rep->selector != NULL) { + if ((t_ret = __repmgr_thread_join(db_rep->selector)) != 0 && + ret == 0) + ret = t_ret; + __os_free(dbenv, db_rep->selector); + db_rep->selector = NULL; + } + + return (ret); +} + +/* + * PUBLIC: int __repmgr_set_local_site __P((DB_ENV *, const char *, u_int, + * PUBLIC: u_int32_t)); + */ +int +__repmgr_set_local_site(dbenv, host, port, flags) + DB_ENV *dbenv; + const char *host; + u_int port; + u_int32_t flags; +{ + ADDRINFO *address_list; + DB_REP *db_rep; + repmgr_netaddr_t addr; + int locked, ret; + const char *sharable_host; + char buffer[MAXHOSTNAMELEN]; + + if (flags != 0) + return (__db_ferr(dbenv, "DB_ENV->repmgr_set_local_site", 0)); + + db_rep = dbenv->rep_handle; + if (db_rep->my_addr.port != 0) { + __db_errx(dbenv, "Listen address already set"); + return (EINVAL); + } + + /* + * If we haven't been given a sharable local host name, we must get one + * for the purpose of sharing with our friends, but it's ok to use for + * the address look-up. "Helpfully" converting special names such as + * "localhost" in this same way is tempting, but in practice turns out + * to be too tricky. + */ + if (host == NULL) { + if ((ret = gethostname(buffer, sizeof(buffer))) != 0) + return (net_errno); + + /* In case truncation leaves no terminating NUL byte: */ + buffer[sizeof(buffer) - 1] = '\0'; + sharable_host = buffer; + } else + sharable_host = host; + + if ((ret = __repmgr_getaddr(dbenv, + host, port, AI_PASSIVE, &address_list)) != 0) + return (ret); + + if ((ret = __repmgr_pack_netaddr(dbenv, + sharable_host, port, address_list, &addr)) != 0) { + __db_freeaddrinfo(dbenv, address_list); + return (ret); + } + + if (REPMGR_SYNC_INITED(db_rep)) { + LOCK_MUTEX(db_rep->mutex); + locked = TRUE; + } else + locked = FALSE; + + memcpy(&db_rep->my_addr, &addr, sizeof(addr)); + + if (locked) + UNLOCK_MUTEX(db_rep->mutex); + return (0); +} + +/* + * If the application only calls this method from a single thread (e.g., during + * its initialization), it will avoid the problems with the non-thread-safe host + * name lookup. In any case, if we relegate the blocking lookup to here it + * won't affect our select() loop. + * + * PUBLIC: int __repmgr_add_remote_site __P((DB_ENV *, const char *, u_int, + * PUBLIC: int *, u_int32_t)); + */ +int +__repmgr_add_remote_site(dbenv, host, port, eidp, flags) + DB_ENV *dbenv; + const char *host; + u_int port; + int *eidp; + u_int32_t flags; +{ + DB_REP *db_rep; + REPMGR_SITE *site; + int eid, locked, ret; + + if ((ret = __db_fchk(dbenv, + "DB_ENV->repmgr_add_remote_site", flags, DB_REPMGR_PEER)) != 0) + return (ret); + + if (host == NULL) { + __db_errx(dbenv, + "repmgr_add_remote_site: host name is required"); + return (EINVAL); + } + + db_rep = dbenv->rep_handle; + + if (REPMGR_SYNC_INITED(db_rep)) { + LOCK_MUTEX(db_rep->mutex); + locked = TRUE; + } else + locked = FALSE; + + if ((ret = __repmgr_add_site(dbenv, host, port, &site)) != 0) + goto unlock; + eid = EID_FROM_SITE(site); + + if (LF_ISSET(DB_REPMGR_PEER)) + db_rep->peer = eid; + if (eidp != NULL) + *eidp = eid; + +unlock: if (locked) + UNLOCK_MUTEX(db_rep->mutex); + return (ret); +} diff --git a/db/repmgr/repmgr_msg.c b/db/repmgr/repmgr_msg.c new file mode 100644 index 000000000..ceeeb968d --- /dev/null +++ b/db/repmgr/repmgr_msg.c @@ -0,0 +1,333 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2005-2006 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_msg.c,v 1.23 2006/09/11 15:15:20 bostic Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#include "db_int.h" + +static int message_loop __P((DB_ENV *)); +static int process_message __P((DB_ENV*, DBT*, DBT*, int)); +static int handle_newsite __P((DB_ENV *, const DBT *)); +static int ack_message __P((DB_ENV *, u_int32_t, DB_LSN *)); + +/* + * PUBLIC: void *__repmgr_msg_thread __P((void *)); + */ +void * +__repmgr_msg_thread(args) + void *args; +{ + DB_ENV *dbenv = args; + int ret; + + if ((ret = message_loop(dbenv)) != 0) { + __db_err(dbenv, ret, "message thread failed"); + __repmgr_thread_failure(dbenv, ret); + } + return (NULL); +} + +static int +message_loop(dbenv) + DB_ENV *dbenv; +{ + REPMGR_MESSAGE *msg; + int ret; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + while ((ret = __repmgr_queue_get(dbenv, &msg)) == 0) { + while ((ret = process_message(dbenv, &msg->control, &msg->rec, + msg->originating_eid)) == DB_LOCK_DEADLOCK) + RPRINT(dbenv, (dbenv, &mb, "repmgr deadlock retry")); + + __os_free(dbenv, msg); + if (ret != 0) + return (ret); + } + + return (ret == DB_REP_UNAVAIL ? 0 : ret); +} + +static int +process_message(dbenv, control, rec, eid) + DB_ENV *dbenv; + DBT *control, *rec; + int eid; +{ + DB_REP *db_rep; + REP *rep; + DB_LSN permlsn; + int ret; + u_int32_t generation; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + db_rep = dbenv->rep_handle; + + /* + * Save initial generation number, in case it changes in a close race + * with a NEWMASTER. See msgdir.10000/10039/msg00086.html. + */ + generation = db_rep->generation; + + switch (ret = + __rep_process_message(dbenv, control, rec, &eid, &permlsn)) { + case DB_REP_NEWSITE: + return (handle_newsite(dbenv, rec)); + + case DB_REP_NEWMASTER: + db_rep->found_master = TRUE; + /* Check if it's us. */ + if ((db_rep->master_eid = eid) == SELF_EID) { + if ((ret = __repmgr_become_master(dbenv)) != 0) + return (ret); + } else { + /* + * Since we have no further need for 'eid' throughout + * the remainder of this function, it's (relatively) + * safe to pass its address directly to the + * application. If that were not the case, we could + * instead copy it into a scratch variable. + */ + RPRINT(dbenv, + (dbenv, &mb, "firing NEWMASTER (%d) event", eid)); + DB_EVENT(dbenv, DB_EVENT_REP_NEWMASTER, &eid); + if ((ret = __repmgr_stash_generation(dbenv)) != 0) + return (ret); + } + break; + + case DB_REP_HOLDELECTION: + LOCK_MUTEX(db_rep->mutex); + db_rep->master_eid = DB_EID_INVALID; + ret = __repmgr_init_election(dbenv, ELECT_ELECTION); + UNLOCK_MUTEX(db_rep->mutex); + if (ret != 0) + return (ret); + break; + + case DB_REP_DUPMASTER: + LOCK_MUTEX(db_rep->mutex); + db_rep->master_eid = DB_EID_INVALID; + ret = __repmgr_init_election(dbenv, ELECT_REPSTART); + UNLOCK_MUTEX(db_rep->mutex); + if (ret != 0) + return (ret); + break; + + case DB_REP_ISPERM: + /* + * Don't bother sending ack if master doesn't care about it. + */ + rep = db_rep->region; + if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE || + (IS_PEER_POLICY(db_rep->perm_policy) && + rep->priority == 0)) + break; + + if ((ret = ack_message(dbenv, generation, &permlsn)) != 0) + return (ret); + + break; + + case DB_REP_NOTPERM: /* FALLTHROUGH */ + case DB_REP_IGNORE: /* FALLTHROUGH */ + case DB_LOCK_DEADLOCK: /* FALLTHROUGH */ + case 0: + break; + default: + __db_err(dbenv, ret, "DB_ENV->rep_process_message"); + return (ret); + } + return (0); +} + +/* + * Acknowledges a message. + */ +static int +ack_message(dbenv, generation, lsn) + DB_ENV *dbenv; + u_int32_t generation; + DB_LSN *lsn; +{ + DB_REP *db_rep; + REPMGR_SITE *site; + REPMGR_CONNECTION *conn; + DB_REPMGR_ACK ack; + DBT control2, rec2; + int ret; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + db_rep = dbenv->rep_handle; + /* + * Regardless of where a message came from, all ack's go to the master + * site. If we're not in touch with the master, we drop it, since + * there's not much else we can do. + */ + if (!IS_VALID_EID(db_rep->master_eid) || + db_rep->master_eid == SELF_EID) { + RPRINT(dbenv, (dbenv, &mb, + "dropping ack with master %d", db_rep->master_eid)); + return (0); + } + + ret = 0; + LOCK_MUTEX(db_rep->mutex); + site = SITE_FROM_EID(db_rep->master_eid); + if (site->state == SITE_CONNECTED && + !F_ISSET(site->ref.conn, CONN_CONNECTING)) { + + ack.generation = generation; + memcpy(&ack.lsn, lsn, sizeof(DB_LSN)); + control2.data = &ack; + control2.size = sizeof(ack); + rec2.size = 0; + + conn = site->ref.conn; + if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, + &control2, &rec2)) == DB_REP_UNAVAIL) + ret = __repmgr_bust_connection(dbenv, conn, FALSE); + } + + UNLOCK_MUTEX(db_rep->mutex); + return (ret); +} + +/* + * Does everything necessary to handle the processing of a NEWSITE return. + */ +static int +handle_newsite(dbenv, rec) + DB_ENV *dbenv; + const DBT *rec; +{ + ADDRINFO *ai; + DB_REP *db_rep; + REPMGR_SITE *site; + repmgr_netaddr_t *addr; + size_t hlen; + u_int16_t port; + int ret; + char *host; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; + SITE_STRING_BUFFER buffer; +#endif + + db_rep = dbenv->rep_handle; + /* + * Check if we got sent connect information and if we did, if + * this is me or if we already have a connection to this new + * site. If we don't, establish a new one. + * + * Unmarshall the cdata: a 2-byte port number, in network byte order, + * followed by the host name string, which should already be + * null-terminated, but let's make sure. + */ + if (rec->size < sizeof(port) + 1) { + __db_errx(dbenv, "unexpected cdata size, msg ignored"); + return (0); + } + memcpy(&port, rec->data, sizeof(port)); + port = ntohs(port); + + host = (char*)((u_int8_t*)rec->data + sizeof(port)); + hlen = (rec->size - sizeof(port)) - 1; + host[hlen] = '\0'; + + /* It's me, do nothing. */ + if (strcmp(host, db_rep->my_addr.host) == 0 && + port == db_rep->my_addr.port) { + RPRINT(dbenv, (dbenv, &mb, "repmgr ignores own NEWSITE info")); + return (0); + } + + LOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_add_site(dbenv, host, port, &site)) == EEXIST) { + RPRINT(dbenv, (dbenv, &mb, + "NEWSITE info from %s was already known", + __repmgr_format_site_loc(site, buffer))); + /* + * TODO: test this. Is this really how it works? When + * a site comes back on-line, do we really get NEWSITE? + * Or is that return code reserved for only the first + * time a site joins a group? + */ + + /* + * TODO: it seems like it might be a good idea to move + * this site's retry up to the beginning of the queue, + * and try it now, on the theory that if it's + * generating a NEWSITE, it might have woken up. Does + * that pose a problem for my assumption of time-ordered + * retry list? I guess not, if we can reorder items. + */ + + /* + * In case we already know about this site only because it + * first connected to us, we may not yet have had a chance to + * look up its addresses. Even though we don't need them just + * now, this is an advantageous opportunity to get them since we + * can do so away from the critical select thread. + */ + addr = &site->net_addr; + if (addr->address_list == NULL && + __repmgr_getaddr(dbenv, + addr->host, addr->port, 0, &ai) == 0) + addr->address_list = ai; + + ret = 0; + if (site->state == SITE_IDLE) { + /* + * TODO: yank the retry object up to the front + * of the queue, after marking it as due now + */ + } else + goto unlock; /* Nothing to do. */ + } else { + RPRINT(dbenv, (dbenv, &mb, "NEWSITE info added %s", + __repmgr_format_site_loc(site, buffer))); + if (ret != 0) + goto unlock; + } + + /* + * Wake up the main thread to connect to the new or reawakened + * site. + */ + ret = __repmgr_wake_main_thread(dbenv); + +unlock: UNLOCK_MUTEX(db_rep->mutex); + return (ret); +} + +/* + * PUBLIC: int __repmgr_stash_generation __P((DB_ENV *)); + */ +int +__repmgr_stash_generation(dbenv) + DB_ENV *dbenv; +{ + DB_REP_STAT *statp; + int ret; + + if ((ret = __rep_stat_pp(dbenv, &statp, 0)) != 0) + return (ret); + dbenv->rep_handle->generation = statp->st_gen; + + __os_ufree(dbenv, statp); + return (0); +} diff --git a/db/repmgr/repmgr_net.c b/db/repmgr/repmgr_net.c new file mode 100644 index 000000000..15402e490 --- /dev/null +++ b/db/repmgr/repmgr_net.c @@ -0,0 +1,1041 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2005-2006 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_net.c,v 1.39 2006/09/19 14:14:11 mjc Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#include "db_int.h" + +/* + * The functions in this module implement a simple wire protocol for + * transmitting messages, both replication messages and our own internal control + * messages. The protocol is as follows: + * + * 1 byte - message type (defined in repmgr_int.h) + * 4 bytes - size of control + * 4 bytes - size of rec + * ? bytes - control + * ? bytes - rec + * + * where both sizes are 32-bit binary integers in network byte order. + * Either control or rec can have zero length, but even in this case the + * 4-byte length will be present. + * Putting both lengths right up at the front allows us to read in fewer + * phases, and allows us to allocate buffer space for both parts (plus a wrapper + * struct) at once. + */ + +/* + * In sending a message, we first try to send it in-line, in the sending thread, + * and without first copying the message, by using scatter/gather I/O, using + * iovecs to point to the various pieces of the message. If that all works + * without blocking, that's optimal. + * If we find that, for a particular connection, we can't send without + * blocking, then we must copy the message for sending later in the select() + * thread. In the course of doing that, we might as well "flatten" the message, + * forming one single buffer, to simplify life. Not only that, once we've gone + * to the trouble of doing that, other sites to which we also want to send the + * message (in the case of a broadcast), may as well take advantage of the + * simplified structure also. + * This structure holds it all. Note that this structure, and the + * "flat_msg" structure, are allocated separately, because (1) the flat_msg + * version is usually not needed; and (2) when it is needed, it will need to + * live longer than the wrapping sending_msg structure. + * Note that, for the broadcast case, where we're going to use this + * repeatedly, the iovecs is a template that must be copied, since in normal use + * the iovecs pointers and lengths get adjusted after every partial write. + * + * TODO: this is such an important point that it's probably worth renaming the + * iovecs field here to iovecs_template. + */ +struct sending_msg { + REPMGR_IOVECS iovecs; + u_int8_t type; + u_int32_t control_size_buf, rec_size_buf; + REPMGR_FLAT *fmsg; +}; + +static int __repmgr_send_broadcast + __P((DB_ENV *, const DBT *, const DBT *, u_int *, u_int *)); +static void setup_sending_msg + __P((struct sending_msg *, u_int, const DBT *, const DBT *)); +static int __repmgr_send_internal + __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *)); +static int enqueue_msg + __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); +static int flatten __P((DB_ENV *, struct sending_msg *)); +static REPMGR_SITE *__repmgr_available_site __P((DB_ENV *, int)); + +/* + * __repmgr_send -- + * The send function for DB_ENV->rep_set_transport. + * + * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, + * PUBLIC: const DB_LSN *, int, u_int32_t)); + */ +int +__repmgr_send(dbenv, control, rec, lsnp, eid, flags) + DB_ENV *dbenv; + const DBT *control, *rec; + const DB_LSN *lsnp; + int eid; + u_int32_t flags; +{ + DB_REP *db_rep; + u_int nsites, npeers, available, needed; + int ret, t_ret; + REPMGR_SITE *site; + REPMGR_CONNECTION *conn; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + db_rep = dbenv->rep_handle; + + LOCK_MUTEX(db_rep->mutex); + if (eid == DB_EID_BROADCAST) { + if ((ret = __repmgr_send_broadcast(dbenv, control, rec, + &nsites, &npeers)) != 0) + goto out; + } else { + /* + * If this is a request that can be sent anywhere, then see if + * we can send it to our peer (to save load on the master), but + * not if it's a rerequest, 'cuz that likely means we tried this + * already and failed. + */ + if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) == + DB_REP_ANYWHERE && + IS_VALID_EID(db_rep->peer) && + (site = __repmgr_available_site(dbenv, db_rep->peer)) != + NULL) { + RPRINT(dbenv, (dbenv, &mb, "sending request to peer")); + } else if ((site = __repmgr_available_site(dbenv, eid)) == + NULL) { + RPRINT(dbenv, (dbenv, &mb, + "ignoring message sent to unavailable site")); + ret = DB_REP_UNAVAIL; + goto out; + } + + conn = site->ref.conn; + if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, + control, rec)) == DB_REP_UNAVAIL && + (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0) + ret = t_ret; + if (ret != 0) + goto out; + + nsites = 1; + npeers = site->priority > 0 ? 1 : 0; + } + /* + * Right now, nsites and npeers represent the (maximum) number of sites + * we've attempted to begin sending the message to. Of course we + * haven't really received any ack's yet. But since we've only sent to + * nsites/npeers other sites, that's the maximum number of ack's we + * could possibly expect. If even that number fails to satisfy our PERM + * policy, there's no point waiting for something that will never + * happen. + */ + if (LF_ISSET(DB_REP_PERMANENT)) { + switch (db_rep->perm_policy) { + case DB_REPMGR_ACKS_NONE: + goto out; + + case DB_REPMGR_ACKS_ONE: + needed = 1; + available = nsites; + break; + + case DB_REPMGR_ACKS_ALL: + /* Number of sites in the group besides myself. */ + needed = __repmgr_get_nsites(db_rep) - 1; + available = nsites; + break; + + case DB_REPMGR_ACKS_ONE_PEER: + needed = 1; + available = npeers; + break; + + case DB_REPMGR_ACKS_ALL_PEERS: + /* + * Too hard to figure out "needed", since we're not + * keeping track of how many peers we have; so just skip + * the optimization in this case. + */ + needed = 1; + available = npeers; + break; + + case DB_REPMGR_ACKS_QUORUM: + /* + * The minimum number of acks necessary to ensure that + * the transaction is durable if an election is held. + */ + needed = (__repmgr_get_nsites(db_rep) - 1) / 2; + available = npeers; + break; + + default: + COMPQUIET(available, 0); + COMPQUIET(needed, 0); + (void)__db_unknown_path(dbenv, "__repmgr_send"); + break; + } + if (available < needed) { + ret = DB_REP_UNAVAIL; + goto out; + } + /* In ALL_PEERS case, display of "needed" might be confusing. */ + RPRINT(dbenv, (dbenv, &mb, + "will await acknowledgement: need %u", needed)); + ret = __repmgr_await_ack(dbenv, lsnp); + } + +out: UNLOCK_MUTEX(db_rep->mutex); + + return (ret); +} + +static REPMGR_SITE * +__repmgr_available_site(dbenv, eid) + DB_ENV *dbenv; + int eid; +{ + DB_REP *db_rep; + REPMGR_SITE *site; + + db_rep = dbenv->rep_handle; + site = SITE_FROM_EID(eid); + if (site->state != SITE_CONNECTED) + return (NULL); + + if (F_ISSET(site->ref.conn, CONN_CONNECTING)) + return (NULL); + return (site); +} + +/* + * Sends message to all sites with which we currently have an active + * connection. Sets result parameters according to how many sites we attempted + * to begin sending to, even if we did nothing more than queue it for later + * delivery. + * + * !!! + * Caller must hold dbenv->mutex. + */ +static int +__repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp) + DB_ENV *dbenv; + const DBT *control, *rec; + u_int *nsitesp, *npeersp; +{ + DB_REP *db_rep; + struct sending_msg msg; + REPMGR_CONNECTION *conn; + REPMGR_SITE *site; + u_int nsites, npeers; + int ret; + + db_rep = dbenv->rep_handle; + + setup_sending_msg(&msg, REPMGR_REP_MESSAGE, control, rec); + nsites = npeers = 0; + + /* + * Traverse the connections list. Here, even in bust_connection, we + * don't unlink the current list entry, so we can use the TAILQ_FOREACH + * macro. + */ + TAILQ_FOREACH(conn, &db_rep->connections, entries) { + if (F_ISSET(conn, CONN_CONNECTING | CONN_DEFUNCT) || + !IS_VALID_EID(conn->eid)) + continue; + + if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) { + site = SITE_FROM_EID(conn->eid); + nsites++; + if (site->priority > 0) + npeers++; + } else if (ret == DB_REP_UNAVAIL) { + if ((ret = __repmgr_bust_connection( + dbenv, conn, FALSE)) != 0) + return (ret); + } else + return (ret); + } + + *nsitesp = nsites; + *npeersp = npeers; + return (0); +} + +/* + * __repmgr_send_one -- + * Send a message to a site, or if you can't just yet, make a copy of it + * and arrange to have it sent later. 'rec' may be NULL, in which case we send + * a zero length and no data. + * + * If we get an error, we take care of cleaning up the connection (calling + * __repmgr_bust_connection()), so that the caller needn't do so. + * + * !!! + * Note that the mutex should be held through this call. + * It doubles as a synchronizer to make sure that two threads don't + * intersperse writes that are part of two single messages. + * + * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, + * PUBLIC: u_int, const DBT *, const DBT *)); + */ +int +__repmgr_send_one(dbenv, conn, msg_type, control, rec) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + u_int msg_type; + const DBT *control, *rec; +{ + struct sending_msg msg; + + setup_sending_msg(&msg, msg_type, control, rec); + return (__repmgr_send_internal(dbenv, conn, &msg)); +} + +/* + * Attempts a "best effort" to send a message on the given site. If there is an + * excessive backlog of message already queued on the connection, we simply drop + * this message, and still return 0 even in this case. + */ +static int +__repmgr_send_internal(dbenv, conn, msg) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + struct sending_msg *msg; +{ +#define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */ + REPMGR_IOVECS iovecs; + int ret; + size_t nw; + size_t total_written; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; + SITE_STRING_BUFFER buffer; +#endif + + DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); + if (!STAILQ_EMPTY(&conn->outbound_queue)) { + /* + * Output to this site is currently owned by the select() + * thread, so we can't try sending in-line here. We can only + * queue the msg for later. + */ + RPRINT(dbenv, (dbenv, &mb, "msg to %s to be queued", + __repmgr_format_eid_loc(dbenv->rep_handle, + conn->eid, buffer))); + if (conn->out_queue_length < OUT_QUEUE_LIMIT) + return (enqueue_msg(dbenv, conn, msg, 0)); + else { + RPRINT(dbenv, (dbenv, &mb, "queue limit exceeded")); +/* repmgr->stats.tossed_msgs++; */ + return (0); + } + } + + /* + * Send as much data to the site as we can, without blocking. Keep + * writing as long as we're making some progress. Make a scratch copy + * of iovecs for our use, since we destroy it in the process of + * adjusting pointers after each partial I/O. + */ + memcpy(&iovecs, &msg->iovecs, sizeof(iovecs)); + total_written = 0; + while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset], + iovecs.count-iovecs.offset, &nw)) == 0) { + total_written += nw; + if (__repmgr_update_consumed(&iovecs, nw)) /* all written */ + return (0); + } + + if (ret != WOULDBLOCK) { + __db_err(dbenv, ret, "socket writing failure"); + return (DB_REP_UNAVAIL); + } + + RPRINT(dbenv, (dbenv, &mb, "wrote only %lu bytes to %s", + (u_long)total_written, + __repmgr_format_eid_loc(dbenv->rep_handle, conn->eid, buffer))); + /* + * We can't send any more without blocking: queue (a pointer to) a + * "flattened" copy of the message, so that the select() thread will + * finish sending it later. + */ + if ((ret = enqueue_msg(dbenv, conn, msg, total_written)) != 0) + return (ret); + +/* repmgr->stats.partial_retransmissions++; */ + + /* + * Wake the main select thread so that it can discover that it has + * received ownership of this connection. Note that we didn't have to + * do this in the previous case (above), because the non-empty queue + * implies that the select() thread is already managing ownership of + * this connection. + */ +#ifdef DB_WIN32 + if (WSAEventSelect(conn->fd, conn->event_object, + FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) { + ret = net_errno; + __db_err(dbenv, ret, "can't add FD_WRITE event bit"); + return (ret); + } +#endif + return (__repmgr_wake_main_thread(dbenv)); +} + +/* + * PUBLIC: int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); + * + * Count up how many sites have ack'ed the given LSN. Returns TRUE if enough + * sites have ack'ed; FALSE otherwise. + * + * !!! + * Caller must hold the mutex. + */ +int +__repmgr_is_permanent(dbenv, lsnp) + DB_ENV *dbenv; + const DB_LSN *lsnp; +{ + DB_REP *db_rep; + REPMGR_SITE *site; + u_int eid, nsites, npeers; + int is_perm, has_missing_peer; + + db_rep = dbenv->rep_handle; + + if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE) + return (TRUE); + + nsites = npeers = 0; + has_missing_peer = FALSE; + for (eid = 0; eid < db_rep->site_cnt; eid++) { + site = SITE_FROM_EID(eid); + if (site->priority == -1) { + /* + * Never connected to this site: since we can't know + * whether it's a peer, assume the worst. + */ + has_missing_peer = TRUE; + continue; + } + + if (log_compare(&site->max_ack, lsnp) >= 0) { + nsites++; + if (site->priority > 0) + npeers++; + } else { + /* This site hasn't ack'ed the message. */ + if (site->priority > 0) + has_missing_peer = TRUE; + } + } + + switch (db_rep->perm_policy) { + case DB_REPMGR_ACKS_ONE: + is_perm = (nsites >= 1); + break; + case DB_REPMGR_ACKS_ONE_PEER: + is_perm = (npeers >= 1); + break; + case DB_REPMGR_ACKS_QUORUM: + /* + * The minimum number of acks necessary to ensure that the + * transaction is durable if an election is held (given that we + * always conduct elections according to the standard, + * recommended practice of requiring votes from a majority of + * sites). + */ + if (__repmgr_get_nsites(db_rep) == 2) { + /* + * A group of 2 sites is, as always, a special case. + * For a transaction to be durable the other site has to + * have received it. + */ + is_perm = (npeers >= 1); + } else + is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2); + break; + case DB_REPMGR_ACKS_ALL: + /* Adjust by 1, since get_nsites includes local site. */ + is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1); + break; + case DB_REPMGR_ACKS_ALL_PEERS: + if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) { + /* Assume missing site might be a peer. */ + has_missing_peer = TRUE; + } + is_perm = !has_missing_peer; + break; + default: + is_perm = FALSE; + (void)__db_unknown_path(dbenv, "__repmgr_is_permanent"); + } + return (is_perm); +} + +/* + * Abandons a connection, to recover from an error. Upon entry the conn struct + * must be on the connections list. + * + * If the 'do_close' flag is true, we do the whole job; the clean-up includes + * removing the struct from the list and freeing all its memory, so upon return + * the caller must not refer to it any further. Otherwise, we merely mark the + * connection for clean-up later by the main thread. + * + * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, + * PUBLIC: REPMGR_CONNECTION *, int)); + * + * !!! + * Caller holds mutex. + */ +int +__repmgr_bust_connection(dbenv, conn, do_close) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + int do_close; +{ + DB_REP *db_rep; + int ret, eid; + + db_rep = dbenv->rep_handle; + ret = 0; + + DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); + eid = conn->eid; + if (do_close) + __repmgr_cleanup_connection(dbenv, conn); + else { + F_SET(conn, CONN_DEFUNCT); + conn->eid = -1; + } + + /* + * When we first accepted the incoming connection, we set conn->eid to + * -1 to indicate that we didn't yet know what site it might be from. + * If we then get here because we later decide it was a redundant + * connection, the following scary stuff will correctly not happen. + */ + if (IS_VALID_EID(eid)) { + /* schedule_connection_attempt wakes the main thread. */ + if ((ret = __repmgr_schedule_connection_attempt( + dbenv, (u_int)eid, FALSE)) != 0) + return (ret); + + if (eid == db_rep->master_eid) { + db_rep->master_eid = DB_EID_INVALID; + + if ((ret = __repmgr_init_election( + dbenv, ELECT_FAILURE_ELECTION)) != 0) + return (ret); + } + } else if (!do_close) { + /* + * One way or another, make sure the main thread is poked, so + * that we do the deferred clean-up. + */ + ret = __repmgr_wake_main_thread(dbenv); + } + return (ret); +} + +/* + * PUBLIC: void __repmgr_cleanup_connection + * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); + */ +void +__repmgr_cleanup_connection(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +{ + DB_REP *db_rep; + QUEUED_OUTPUT *out; + REPMGR_FLAT *msg; + DBT *dbt; + + db_rep = dbenv->rep_handle; + + TAILQ_REMOVE(&db_rep->connections, conn, entries); + (void)closesocket(conn->fd); +#ifdef DB_WIN32 + (void)WSACloseEvent(conn->event_object); +#endif + + /* + * Deallocate any input and output buffers we may have. + */ + if (conn->reading_phase == DATA_PHASE) { + if (conn->msg_type == REPMGR_REP_MESSAGE) + __os_free(dbenv, conn->input.rep_message); + else { + dbt = &conn->input.repmgr_msg.cntrl; + __os_free(dbenv, dbt->data); + dbt = &conn->input.repmgr_msg.rec; + if (dbt->size > 0) + __os_free(dbenv, dbt->data); + } + } + while (!STAILQ_EMPTY(&conn->outbound_queue)) { + out = STAILQ_FIRST(&conn->outbound_queue); + STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries); + msg = out->msg; + if (--msg->ref_count <= 0) + __os_free(dbenv, msg); + __os_free(dbenv, out); + } + + __os_free(dbenv, conn); +} + +static int +enqueue_msg(dbenv, conn, msg, offset) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + struct sending_msg *msg; + size_t offset; +{ + QUEUED_OUTPUT *q_element; + int ret; + + if (msg->fmsg == NULL && ((ret = flatten(dbenv, msg)) != 0)) + return (ret); + if ((ret = __os_malloc(dbenv, sizeof(QUEUED_OUTPUT), &q_element)) != 0) + return (ret); + q_element->msg = msg->fmsg; + msg->fmsg->ref_count++; /* encapsulation would be sweeter */ + q_element->offset = offset; + + /* Put it on the connection's outbound queue. */ + STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries); + conn->out_queue_length++; + return (0); +} + +/* + * The 'rec' DBT can be NULL, in which case we treat it like a zero-length DBT. + * But 'control' is always present. + */ +static void +setup_sending_msg(msg, type, control, rec) + struct sending_msg *msg; + u_int type; + const DBT *control, *rec; +{ + u_int32_t rec_size; + + /* + * The wire protocol is documented in a comment at the top of this + * module. + */ + __repmgr_iovec_init(&msg->iovecs); + msg->type = type; + __repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type)); + + msg->control_size_buf = htonl(control->size); + __repmgr_add_buffer(&msg->iovecs, + &msg->control_size_buf, sizeof(msg->control_size_buf)); + + rec_size = rec == NULL ? 0 : rec->size; + msg->rec_size_buf = htonl(rec_size); + __repmgr_add_buffer( + &msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf)); + + if (control->size > 0) + __repmgr_add_dbt(&msg->iovecs, control); + + if (rec_size > 0) + __repmgr_add_dbt(&msg->iovecs, rec); + + msg->fmsg = NULL; +} + +/* + * Convert a message stored as iovec pointers to various pieces, into flattened + * form, by copying all the pieces, and then make the iovec just point to the + * new simplified form. + */ +static int +flatten(dbenv, msg) + DB_ENV *dbenv; + struct sending_msg *msg; +{ + u_int8_t *p; + size_t msg_size; + int i, ret; + + DB_ASSERT(dbenv, msg->fmsg == NULL); + + msg_size = msg->iovecs.total_bytes; + if ((ret = __os_malloc(dbenv, sizeof(*msg->fmsg) + msg_size, + &msg->fmsg)) != 0) + return (ret); + msg->fmsg->length = msg_size; + msg->fmsg->ref_count = 0; + p = &msg->fmsg->data[0]; + + for (i = 0; i < msg->iovecs.count; i++) { + memcpy(p, msg->iovecs.vectors[i].iov_base, + msg->iovecs.vectors[i].iov_len); + p = &p[msg->iovecs.vectors[i].iov_len]; + } + __repmgr_iovec_init(&msg->iovecs); + __repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size); + return (0); +} + +/* + * PUBLIC: int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); + */ +int +__repmgr_find_site(dbenv, host, port) + DB_ENV *dbenv; + const char *host; + u_int port; +{ + DB_REP *db_rep; + REPMGR_SITE *site; + u_int i; + + db_rep = dbenv->rep_handle; + for (i = 0; i < db_rep->site_cnt; i++) { + site = &db_rep->sites[i]; + + if (strcmp(site->net_addr.host, host) == 0 && + site->net_addr.port == port) + return ((int)i); + } + + return (-1); +} + +/* + * Stash a copy of the given host name and port number into a convenient data + * structure so that we can save it permanently. This is kind of like a + * constructor for a netaddr object, except that the caller supplies the memory + * for the base struct (though not the subordinate attachments). + * + * All inputs are assumed to have been already validated. + * + * PUBLIC: int __repmgr_pack_netaddr __P((DB_ENV *, const char *, + * PUBLIC: u_int, ADDRINFO *, repmgr_netaddr_t *)); + */ +int +__repmgr_pack_netaddr(dbenv, host, port, list, addr) + DB_ENV *dbenv; + const char *host; + u_int port; + ADDRINFO *list; + repmgr_netaddr_t *addr; +{ + int ret; + + DB_ASSERT(dbenv, host != NULL); + + if ((ret = __os_strdup(dbenv, host, &addr->host)) != 0) + return (ret); + addr->port = (u_int16_t)port; + addr->address_list = list; + addr->current = NULL; + return (0); +} + +/* + * PUBLIC: int __repmgr_getaddr __P((DB_ENV *, + * PUBLIC: const char *, u_int, int, ADDRINFO **)); + */ +int +__repmgr_getaddr(dbenv, host, port, flags, result) + DB_ENV *dbenv; + const char *host; + u_int port; + int flags; /* Matches struct addrinfo declaration. */ + ADDRINFO **result; +{ + ADDRINFO *answer, hints; + int ret; + char buffer[10]; /* 2**16 fits in 5 digits. */ + + /* + * Ports are really 16-bit unsigned values, but it's too painful to + * push that type through the API. + */ + if (port > UINT16_MAX) { + __db_errx(dbenv, "port %u larger than max port %u", + port, UINT16_MAX); + return (EINVAL); + } + +#ifdef DB_WIN32 + if (!dbenv->rep_handle->wsa_inited && + (ret = __repmgr_wsa_init(dbenv)) != 0) + return (ret); +#endif + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = flags; + (void)snprintf(buffer, sizeof(buffer), "%u", port); + if ((ret = + __db_getaddrinfo(dbenv, host, port, buffer, &hints, &answer)) != 0) + return (ret); + *result = answer; + + return (0); +} + +/* + * Adds a new site to our array of known sites (unless it already exists), + * and schedules it for immediate connection attempt. Whether it exists or not, + * we set newsitep, either to the already existing site, or to the newly created + * site. Unless newsitep is passed in as NULL, which is allowed. + * + * PUBLIC: int __repmgr_add_site + * PUBLIC: __P((DB_ENV *, const char *, u_int, REPMGR_SITE **)); + * + * !!! + * Caller is expected to hold the mutex. + */ +int +__repmgr_add_site(dbenv, host, port, newsitep) + DB_ENV *dbenv; + const char *host; + u_int port; + REPMGR_SITE **newsitep; +{ + DB_REP *db_rep; + ADDRINFO *address_list; + repmgr_netaddr_t addr; + REPMGR_SITE *site; + int ret, eid; + + ret = 0; + db_rep = dbenv->rep_handle; + + if (IS_VALID_EID(eid = __repmgr_find_site(dbenv, host, port))) { + site = SITE_FROM_EID(eid); + ret = EEXIST; + goto out; + } + + if ((ret = __repmgr_getaddr(dbenv, host, port, 0, &address_list)) != 0) + return (ret); + + if ((ret = __repmgr_pack_netaddr( + dbenv, host, port, address_list, &addr)) != 0) { + __db_freeaddrinfo(dbenv, address_list); + return (ret); + } + + if ((ret = __repmgr_new_site(dbenv, &site, &addr, SITE_IDLE)) != 0) { + __repmgr_cleanup_netaddr(dbenv, &addr); + return (ret); + } + + if (db_rep->selector != NULL && + (ret = __repmgr_schedule_connection_attempt( + dbenv, (u_int)EID_FROM_SITE(site), TRUE)) != 0) + return (ret); + + /* Note that we should only come here for success and EEXIST. */ +out: + if (newsitep != NULL) + *newsitep = site; + return (ret); +} + +/* + * Initializes net-related memory in the db_rep handle. + * + * PUBLIC: int __repmgr_net_create __P((DB_ENV *, DB_REP *)); + */ +int +__repmgr_net_create(dbenv, db_rep) + DB_ENV *dbenv; + DB_REP *db_rep; +{ + COMPQUIET(dbenv, NULL); + + db_rep->listen_fd = INVALID_SOCKET; + db_rep->master_eid = DB_EID_INVALID; + + TAILQ_INIT(&db_rep->connections); + TAILQ_INIT(&db_rep->retries); + + return (0); +} + +/* + * listen_socket_init -- + * Initialize a socket for listening. Sets + * a file descriptor for the socket, ready for an accept() call + * in a thread that we're happy to let block. + * + * PUBLIC: int __repmgr_listen __P((DB_ENV *)); + */ +int +__repmgr_listen(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + ADDRINFO *ai; + char *why; + int sockopt, ret; + socket_t s; + + db_rep = dbenv->rep_handle; + + /* Use OOB value as sentinel to show no socket open. */ + s = INVALID_SOCKET; + ai = ADDR_LIST_FIRST(&db_rep->my_addr); + + /* + * Given the assert is correct, we execute the loop at least once, which + * means 'why' will have been set by the time it's needed. But I guess + * lint doesn't know about DB_ASSERT. + */ + COMPQUIET(why, ""); + DB_ASSERT(dbenv, ai != NULL); + for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) { + + if ((s = socket(ai->ai_family, + ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) { + why = "can't create listen socket"; + continue; + } + + /* + * When testing, it's common to kill and restart regularly. On + * some systems, this causes bind to fail with "address in use" + * errors unless this option is set. + */ + sockopt = 1; + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt, + sizeof(sockopt)) != 0) { + why = "can't set REUSEADDR socket option"; + break; + } + + if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) { + why = "can't bind socket to listening address"; + (void)closesocket(s); + s = INVALID_SOCKET; + continue; + } + + if (listen(s, 5) != 0) { + why = "listen()"; + break; + } + + if ((ret = __repmgr_set_nonblocking(s)) != 0) { + __db_err(dbenv, ret, "can't unblock listen socket"); + goto clean; + } + + db_rep->listen_fd = s; + return (0); + } + + ret = net_errno; + __db_err(dbenv, ret, why); +clean: if (s != INVALID_SOCKET) + (void)closesocket(s); + return (ret); +} + +/* + * PUBLIC: int __repmgr_net_close __P((DB_ENV *)); + */ +int +__repmgr_net_close(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; +#ifndef DB_WIN32 + struct sigaction sigact; +#endif + int ret; + + db_rep = dbenv->rep_handle; + if (db_rep->listen_fd == INVALID_SOCKET) + return (0); + + ret = 0; + + if (closesocket(db_rep->listen_fd) == SOCKET_ERROR) + ret = net_errno; + +#ifdef DB_WIN32 + /* Shut down the Windows sockets DLL. */ + if (WSACleanup() == SOCKET_ERROR && ret == 0) + ret = net_errno; + db_rep->wsa_inited = FALSE; +#else + /* Restore original SIGPIPE handling configuration. */ + if (db_rep->chg_sig_handler) { + memset(&sigact, 0, sizeof(sigact)); + sigact.sa_handler = SIG_DFL; + if (sigaction(SIGPIPE, &sigact, NULL) == -1 && ret == 0) + ret = errno; + } +#endif + db_rep->listen_fd = INVALID_SOCKET; + return (ret); +} + +/* + * PUBLIC: void __repmgr_net_destroy __P((DB_ENV *, DB_REP *)); + */ +void +__repmgr_net_destroy(dbenv, db_rep) + DB_ENV *dbenv; + DB_REP *db_rep; +{ + REPMGR_CONNECTION *conn; + REPMGR_RETRY *retry; + REPMGR_SITE *site; + u_int i; + + if (db_rep->sites == NULL) + return; + + /* + * TODO: I think maybe these, and especially connections, should be + * cleaned up in close(), 'cuz there's more than just memory there. + * Does it matter, I wonder? + */ + while (!TAILQ_EMPTY(&db_rep->retries)) { + retry = TAILQ_FIRST(&db_rep->retries); + TAILQ_REMOVE(&db_rep->retries, retry, entries); + __os_free(dbenv, retry); + } + + while (!TAILQ_EMPTY(&db_rep->connections)) { + conn = TAILQ_FIRST(&db_rep->connections); + __repmgr_cleanup_connection(dbenv, conn); + } + + for (i = 0; i < db_rep->site_cnt; i++) { + site = &db_rep->sites[i]; + __repmgr_cleanup_netaddr(dbenv, &site->net_addr); + } + __os_free(dbenv, db_rep->sites); + db_rep->sites = NULL; +} diff --git a/db/repmgr/repmgr_posix.c b/db/repmgr/repmgr_posix.c new file mode 100644 index 000000000..1a04fef25 --- /dev/null +++ b/db/repmgr/repmgr_posix.c @@ -0,0 +1,714 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2005-2006 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_posix.c,v 1.22 2006/09/11 15:15:20 bostic Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#define __INCLUDE_SELECT_H 1 +#include "db_int.h" + +/* + * A very rough guess at the maximum stack space one of our threads could ever + * need, which we hope is plenty conservative. This can be patched in the field + * if necessary. + */ +#ifdef _POSIX_THREAD_ATTR_STACKSIZE +size_t __repmgr_guesstimated_max = (128 * 1024); +#endif + +static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *)); + +/* + * Starts the thread described in the argument, and stores the resulting thread + * ID therein. + * + * PUBLIC: int __repmgr_thread_start __P((DB_ENV *, REPMGR_RUNNABLE *)); + */ +int +__repmgr_thread_start(dbenv, runnable) + DB_ENV *dbenv; + REPMGR_RUNNABLE *runnable; +{ + pthread_attr_t *attrp; +#ifdef _POSIX_THREAD_ATTR_STACKSIZE + pthread_attr_t attributes; + size_t size; + int ret; +#endif + + runnable->finished = FALSE; + +#ifdef _POSIX_THREAD_ATTR_STACKSIZE + attrp = &attributes; + if ((ret = pthread_attr_init(&attributes)) != 0) { + __db_err(dbenv, + ret, "pthread_attr_init in repmgr_thread_start"); + return (ret); + } + + /* + * On a 64-bit machine it seems reasonable that we could need twice as + * much stack space as we did on a 32-bit machine. + */ + size = __repmgr_guesstimated_max; + if (sizeof(size_t) > 4) + size *= 2; +#ifdef PTHREAD_STACK_MIN + if (size < PTHREAD_STACK_MIN) + size = PTHREAD_STACK_MIN; +#endif + if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) { + __db_err(dbenv, + ret, "pthread_attr_setstacksize in repmgr_thread_start"); + return (ret); + } +#else + attrp = NULL; +#endif + + return (pthread_create(&runnable->thread_id, attrp, + runnable->run, dbenv)); +} + +/* + * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *)); + */ +int +__repmgr_thread_join(thread) + REPMGR_RUNNABLE *thread; +{ + return (pthread_join(thread->thread_id, NULL)); +} + +/* + * PUBLIC: int __repmgr_set_nonblocking __P((socket_t)); + */ +int __repmgr_set_nonblocking(fd) + socket_t fd; +{ + int flags; + + if ((flags = fcntl(fd, F_GETFL, 0)) < 0) + return (errno); + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) + return (errno); + return (0); +} + +/* + * PUBLIC: int __repmgr_wake_waiting_senders __P((DB_ENV *)); + * + * Wake any send()-ing threads waiting for an acknowledgement. + * + * !!! + * Caller must hold the db_rep->mutex, if this thread synchronization is to work + * properly. + */ +int +__repmgr_wake_waiting_senders(dbenv) + DB_ENV *dbenv; +{ + return (pthread_cond_broadcast(&dbenv->rep_handle->ack_condition)); +} + +/* + * PUBLIC: int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *)); + * + * Waits (a limited time) for configured number of remote sites to ack the given + * LSN. + * + * !!! + * Caller must hold repmgr->mutex (TODO: although that seems a shame, with all + * that deadline calculation). + */ +int +__repmgr_await_ack(dbenv, lsnp) + DB_ENV *dbenv; + const DB_LSN *lsnp; +{ + DB_REP *db_rep; + struct timespec deadline; + int ret, timed; + + db_rep = dbenv->rep_handle; + + if ((timed = (db_rep->ack_timeout > 0))) + __repmgr_compute_wait_deadline(dbenv, &deadline, + db_rep->ack_timeout); + else + COMPQUIET(deadline.tv_sec, 0); + + while (!__repmgr_is_permanent(dbenv, lsnp)) { + if (timed) + ret = pthread_cond_timedwait(&db_rep->ack_condition, + &db_rep->mutex, &deadline); + else + ret = pthread_cond_wait(&db_rep->ack_condition, + &db_rep->mutex); + if (db_rep->finished) + return (DB_REP_UNAVAIL); + if (ret != 0) + return (ret); /* TODO: but first check if we need (to + * create) a panic */ + } + return (0); +} + +/* + * Computes a deadline time a certain distance into the future, in a form + * suitable for the pthreads timed wait operation. Curiously, that call uses + * nano-second resolution; elsewhere we use microseconds. + * + * PUBLIC: void __repmgr_compute_wait_deadline __P((DB_ENV*, + * PUBLIC: struct timespec *, db_timeout_t)); + */ +void +__repmgr_compute_wait_deadline(dbenv, result, wait) + DB_ENV *dbenv; + struct timespec *result; + db_timeout_t wait; +{ + u_int32_t secs, usecs; + + /* + * Start with "now"; then add the "wait" offset. + */ + __os_clock(dbenv, &secs, &usecs); + + if (wait > 1000000) { + secs += wait / 1000000; + usecs += wait % 1000000; + } else + usecs += wait; + + if (usecs > 1000000) { + secs++; + usecs -= 1000000; + } + + result->tv_sec = (time_t)secs; + result->tv_nsec = (long)(usecs * 1000); +} + +/* + * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); + * + * Allocate/initialize all data necessary for thread synchronization. This + * should be an all-or-nothing affair. Other than here and in _close_sync there + * should never be a time when these resources aren't either all allocated or + * all freed. If that's true, then we can safely use the values of the file + * descriptor(s) to keep track of which it is. + */ +int +__repmgr_init_sync(dbenv, db_rep) + DB_ENV *dbenv; + DB_REP *db_rep; +{ + int ret, mutex_inited, ack_inited, elect_inited, queue_inited, + file_desc[2]; + + COMPQUIET(dbenv, NULL); + + mutex_inited = ack_inited = elect_inited = queue_inited = FALSE; + + if ((ret = pthread_mutex_init(&db_rep->mutex, NULL)) != 0) + goto err; + mutex_inited = TRUE; + + if ((ret = pthread_cond_init(&db_rep->ack_condition, NULL)) != 0) + goto err; + ack_inited = TRUE; + + if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0) + goto err; + elect_inited = TRUE; + + if ((ret = pthread_cond_init(&db_rep->queue_nonempty, NULL)) != 0) + goto err; + queue_inited = TRUE; + + if ((ret = pipe(file_desc)) == -1) { + ret = errno; + goto err; + } + + db_rep->read_pipe = file_desc[0]; + db_rep->write_pipe = file_desc[1]; + return (0); +err: + if (queue_inited) + (void)pthread_cond_destroy(&db_rep->queue_nonempty); + if (elect_inited) + (void)pthread_cond_destroy(&db_rep->check_election); + if (ack_inited) + (void)pthread_cond_destroy(&db_rep->ack_condition); + if (mutex_inited) + (void)pthread_mutex_destroy(&db_rep->mutex); + db_rep->read_pipe = db_rep->write_pipe = -1; + + return (ret); +} + +/* + * PUBLIC: int __repmgr_close_sync __P((DB_ENV *)); + * + * Frees the thread synchronization data within a repmgr struct, in a + * platform-specific way. + */ +int +__repmgr_close_sync(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + int ret, t_ret; + + db_rep = dbenv->rep_handle; + + if (!(REPMGR_SYNC_INITED(db_rep))) + return (0); + + ret = pthread_cond_destroy(&db_rep->queue_nonempty); + + if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 && + ret == 0) + ret = t_ret; + + if ((t_ret = pthread_cond_destroy(&db_rep->ack_condition)) != 0 && + ret == 0) + ret = t_ret; + + if ((t_ret = pthread_mutex_destroy(&db_rep->mutex)) != 0 && + ret == 0) + ret = t_ret; + + if (close(db_rep->read_pipe) == -1 && ret == 0) + ret = errno; + if (close(db_rep->write_pipe) == -1 && ret == 0) + ret = errno; + + db_rep->read_pipe = db_rep->write_pipe = -1; + return (ret); +} + +/* + * Performs net-related resource initialization other than memory initialization + * and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing" + * sentinel signifying that these resources are allocated. + * + * PUBLIC: int __repmgr_net_init __P((DB_ENV *, DB_REP *)); + */ +int +__repmgr_net_init(dbenv, db_rep) + DB_ENV *dbenv; + DB_REP *db_rep; +{ + int ret; + struct sigaction sigact; + + if ((ret = __repmgr_listen(dbenv)) != 0) + return (ret); + + /* + * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed + * just for trying to write onto a socket that had been reset. + */ + if (sigaction(SIGPIPE, NULL, &sigact) == -1) { + ret = errno; + __db_err(dbenv, ret, "can't access signal handler"); + goto err; + } + /* + * If we need to change the sig handler, do so, and also set a flag so + * that we remember we did. + */ + if ((db_rep->chg_sig_handler = (sigact.sa_handler == SIG_DFL))) { + sigact.sa_handler = SIG_IGN; + sigact.sa_flags = 0; + if (sigaction(SIGPIPE, &sigact, NULL) == -1) { + ret = errno; + __db_err(dbenv, ret, "can't access signal handler"); + goto err; + } + } + return (0); + +err: + (void)closesocket(db_rep->listen_fd); + db_rep->listen_fd = INVALID_SOCKET; + return (ret); +} + +/* + * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *)); + */ +int +__repmgr_lock_mutex(mutex) + mgr_mutex_t *mutex; +{ + return (pthread_mutex_lock(mutex)); +} + +/* + * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *)); + */ +int +__repmgr_unlock_mutex(mutex) + mgr_mutex_t *mutex; +{ + return (pthread_mutex_unlock(mutex)); +} + +/* + * Signals a condition variable. + * + * !!! + * Caller must hold mutex. + * + * PUBLIC: int __repmgr_signal __P((cond_var_t *)); + */ +int +__repmgr_signal(v) + cond_var_t *v; +{ + return (pthread_cond_broadcast(v)); +} + +/* + * PUBLIC: int __repmgr_wake_main_thread __P((DB_ENV*)); + */ +int +__repmgr_wake_main_thread(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + u_int8_t any_value; + + COMPQUIET(any_value, 0); + db_rep = dbenv->rep_handle; + + /* + * It doesn't matter what byte value we write. Just the appearance of a + * byte in the stream is enough to wake up the select() thread reading + * the pipe. + */ + if (write(db_rep->write_pipe, &any_value, 1) == -1) + return (errno); + return (0); +} + +/* + * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *)); + */ +int +__repmgr_writev(fd, iovec, buf_count, byte_count_p) + socket_t fd; + db_iovec_t *iovec; + int buf_count; + size_t *byte_count_p; +{ + int nw; + + if ((nw = writev(fd, iovec, buf_count)) == -1) + return (errno); + *byte_count_p = (size_t)nw; + return (0); +} + +/* + * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *)); + */ +int +__repmgr_readv(fd, iovec, buf_count, byte_count_p) + socket_t fd; + db_iovec_t *iovec; + int buf_count; + size_t *byte_count_p; +{ + ssize_t nw; + + if ((nw = readv(fd, iovec, buf_count)) == -1) + return (errno); + *byte_count_p = (size_t)nw; + return (0); +} + +/* + * Calculate the time duration from now til "when", in the form of a struct + * timeval (suitable for select()), clipping the result at 0 (i.e., avoid a + * negative result). + * + * PUBLIC: void __repmgr_timeval_diff_current + * PUBLIC: __P((DB_ENV *, repmgr_timeval_t *, select_timeout_t *)); + */ +void +__repmgr_timeval_diff_current(dbenv, when, result) + DB_ENV *dbenv; + repmgr_timeval_t *when; + select_timeout_t *result; +{ + repmgr_timeval_t now; + + __os_clock(dbenv, &now.tv_sec, &now.tv_usec); + if (__repmgr_timeval_cmp(when, &now) <= 0) + result->tv_sec = result->tv_usec = 0; + else { + /* + * Do the arithmetic; first see if we need to "borrow". + */ + if (when->tv_usec < now.tv_usec) { + when->tv_usec += 1000000; + when->tv_sec--; + } + result->tv_usec = (long)(when->tv_usec - now.tv_usec); + result->tv_sec = (time_t)(when->tv_sec - now.tv_sec); + } +} + +/* + * PUBLIC: int __repmgr_select_loop __P((DB_ENV *)); + */ +int +__repmgr_select_loop(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn, *next; + REPMGR_RETRY *retry; + select_timeout_t timeout, *timeout_p; + fd_set reads, writes; + int ret, flow_control, maxfd, nready; + u_int8_t buf[10]; /* arbitrary size */ + + /* TODO: turn this on when the input queue gets too big. */ + flow_control = FALSE; + + db_rep = dbenv->rep_handle; + /* + * Almost this entire thread operates while holding the mutex. But note + * that it never blocks, except in the call to select() (which is the + * one place we relinquish the mutex). + */ + LOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_first_try_connections(dbenv)) != 0) + goto out; + for (;;) { + FD_ZERO(&reads); + FD_ZERO(&writes); + + /* + * Always ask for input on listening socket and signalling + * pipe. + */ + FD_SET((u_int)db_rep->listen_fd, &reads); + maxfd = db_rep->listen_fd; + + FD_SET((u_int)db_rep->read_pipe, &reads); + if (db_rep->read_pipe > maxfd) + maxfd = db_rep->read_pipe; + + /* + * Examine all connections to see what sort of I/O to ask for on + * each one. The TAILQ_FOREACH macro would be suitable here, + * except that it doesn't allow unlinking the current element, + * which is needed for cleanup_connection. + */ + for (conn = TAILQ_FIRST(&db_rep->connections); + conn != NULL; + conn = next) { + next = TAILQ_NEXT(conn, entries); + if (F_ISSET(conn, CONN_DEFUNCT)) { + __repmgr_cleanup_connection(dbenv, conn); + continue; + } + + if (F_ISSET(conn, CONN_CONNECTING)) { + FD_SET((u_int)conn->fd, &reads); + FD_SET((u_int)conn->fd, &writes); + if (conn->fd > maxfd) + maxfd = conn->fd; + continue; + } + + if (!STAILQ_EMPTY(&conn->outbound_queue)) { + FD_SET((u_int)conn->fd, &writes); + if (conn->fd > maxfd) + maxfd = conn->fd; + } + /* + * If we haven't yet gotten site's handshake, then read + * from it even if we're flow-controlling. + */ + if (!flow_control || !IS_VALID_EID(conn->eid)) { + FD_SET((u_int)conn->fd, &reads); + if (conn->fd > maxfd) + maxfd = conn->fd; + } + } + /* + * Decide how long to wait based on when it will next be time to + * retry an idle connection. (List items are in order, so we + * only have to examine the first one.) + */ + if (TAILQ_EMPTY(&db_rep->retries)) + timeout_p = NULL; + else { + retry = TAILQ_FIRST(&db_rep->retries); + + timeout_p = &timeout; + __repmgr_timeval_diff_current( + dbenv, &retry->time, timeout_p); + } + + UNLOCK_MUTEX(db_rep->mutex); + + if ((ret = select(maxfd + 1, &reads, &writes, NULL, timeout_p)) + == -1) { + switch (ret = errno) { + case EINTR: + case EWOULDBLOCK: + LOCK_MUTEX(db_rep->mutex); + continue; /* simply retry */ + default: + __db_err(dbenv, ret, "select"); + return (ret); + } + } + nready = ret; + + LOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_retry_connections(dbenv)) != 0) + goto out; + if (nready == 0) + continue; + + /* + * Traverse the linked list. Almost like TAILQ_FOREACH, except + * that we need the ability to unlink an element along the way. + */ + for (conn = TAILQ_FIRST(&db_rep->connections); + conn != NULL; + conn = next) { + next = TAILQ_NEXT(conn, entries); + if (F_ISSET(conn, CONN_CONNECTING)) { + if (FD_ISSET((u_int)conn->fd, &reads) || + FD_ISSET((u_int)conn->fd, &writes)) { + if ((ret = finish_connecting(dbenv, + conn)) == DB_REP_UNAVAIL) { + if ((ret = + __repmgr_bust_connection( + dbenv, conn, TRUE)) != 0) + goto out; + } else if (ret != 0) + goto out; + } + continue; + } + + /* + * Here, the site is connected, and the FD_SET's are + * valid. + */ + if (FD_ISSET((u_int)conn->fd, &writes)) { + if ((ret = __repmgr_write_some( + dbenv, conn)) == DB_REP_UNAVAIL) { + if ((ret = + __repmgr_bust_connection(dbenv, + conn, TRUE)) != 0) + goto out; + continue; + } else if (ret != 0) + goto out; + } + + if (!flow_control && + FD_ISSET((u_int)conn->fd, &reads)) { + if ((ret = __repmgr_read_from_site(dbenv, conn)) + == DB_REP_UNAVAIL) { + if ((ret = + __repmgr_bust_connection(dbenv, + conn, TRUE)) != 0) + goto out; + continue; + } else if (ret != 0) + goto out; + } + } + + /* + * Read any bytes in the signalling pipe. Note that we don't + * actually need to do anything with them; they're just there to + * wake us up when necessary. + */ + if (FD_ISSET((u_int)db_rep->read_pipe, &reads)) { + if (read(db_rep->read_pipe, buf, sizeof(buf)) <= 0) { + ret = errno; + goto out; + } else if (db_rep->finished) { + ret = 0; + goto out; + } + } + if (FD_ISSET((u_int)db_rep->listen_fd, &reads) && + (ret = __repmgr_accept(dbenv)) != 0) + goto out; + } +out: + UNLOCK_MUTEX(db_rep->mutex); + return (ret); +} + +static int +finish_connecting(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +{ + DB_REP *db_rep; + REPMGR_SITE *site; + socklen_t len; + SITE_STRING_BUFFER buffer; + u_int eid; + int error, ret; + + len = sizeof(error); + if (getsockopt( + conn->fd, SOL_SOCKET, SO_ERROR, (sockopt_t)&error, &len) < 0) + goto err_rpt; + if (error) { + errno = error; + goto err_rpt; + } + + F_CLR(conn, CONN_CONNECTING); + return (__repmgr_send_handshake(dbenv, conn)); + +err_rpt: + db_rep = dbenv->rep_handle; + + DB_ASSERT(dbenv, IS_VALID_EID(conn->eid)); + eid = (u_int)conn->eid; + + site = SITE_FROM_EID(eid); + __db_err(dbenv, errno, + "connecting to %s", __repmgr_format_site_loc(site, buffer)); + + /* If we've exhausted the list of possible addresses, give up. */ + if (ADDR_LIST_NEXT(&site->net_addr) == NULL) + return (DB_REP_UNAVAIL); + + /* + * This is just like a little mini-"bust_connection", except that we + * don't reschedule for later, 'cuz we're just about to try again right + * now. + */ + DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); + __repmgr_cleanup_connection(dbenv, conn); + ret = __repmgr_connect_site(dbenv, eid); + DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); + return (ret); +} diff --git a/db/repmgr/repmgr_queue.c b/db/repmgr/repmgr_queue.c new file mode 100644 index 000000000..be9adc2e7 --- /dev/null +++ b/db/repmgr/repmgr_queue.c @@ -0,0 +1,158 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2006 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_queue.c,v 1.6 2006/08/24 14:46:26 bostic Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#include "db_int.h" + +typedef STAILQ_HEAD(__repmgr_q_header, __repmgr_message) QUEUE_HEADER; +struct __repmgr_queue { + int size; + QUEUE_HEADER header; +}; + +/* + * PUBLIC: int __repmgr_queue_create __P((DB_ENV *, DB_REP *)); + */ +int +__repmgr_queue_create(dbenv, db_rep) + DB_ENV *dbenv; + DB_REP *db_rep; +{ + REPMGR_QUEUE *q; + int ret; + + COMPQUIET(dbenv, NULL); + + if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_QUEUE), &q)) != 0) + return (ret); + q->size = 0; + STAILQ_INIT(&q->header); + db_rep->input_queue = q; + return (0); +} + +/* + * Frees not only the queue header, but also any messages that may be on it, + * along with their data buffers. + * + * PUBLIC: void __repmgr_queue_destroy __P((DB_ENV *)); + */ +void +__repmgr_queue_destroy(dbenv) + DB_ENV *dbenv; +{ + REPMGR_QUEUE *q; + REPMGR_MESSAGE *m; + + if ((q = dbenv->rep_handle->input_queue) == NULL) + return; + + while (!STAILQ_EMPTY(&q->header)) { + m = STAILQ_FIRST(&q->header); + STAILQ_REMOVE_HEAD(&q->header, entries); + __os_free(dbenv, m); + } + __os_free(dbenv, q); +} + +/* + * PUBLIC: int __repmgr_queue_get __P((DB_ENV *, REPMGR_MESSAGE **)); + * + * Get the first input message from the queue and return it to the caller. The + * caller hereby takes responsibility for the entire message buffer, and should + * free it when done. + * + * Note that caller is NOT expected to hold the mutex. This is asymmetric with + * put(), because put() is expected to be called in a loop after select, where + * it's already necessary to be holding the mutex. + */ +int +__repmgr_queue_get(dbenv, msgp) + DB_ENV *dbenv; + REPMGR_MESSAGE **msgp; +{ + DB_REP *db_rep; + REPMGR_QUEUE *q; + REPMGR_MESSAGE *m; + int ret; + + ret = 0; + db_rep = dbenv->rep_handle; + q = db_rep->input_queue; + + LOCK_MUTEX(db_rep->mutex); + while (STAILQ_EMPTY(&q->header) && !db_rep->finished) { +#ifdef DB_WIN32 + if (!ResetEvent(db_rep->queue_nonempty)) { + ret = GetLastError(); + goto err; + } + if (SignalObjectAndWait(db_rep->mutex, db_rep->queue_nonempty, + INFINITE, FALSE) != WAIT_OBJECT_0) { + ret = GetLastError(); + goto err; + } + LOCK_MUTEX(db_rep->mutex); +#else + if ((ret = pthread_cond_wait(&db_rep->queue_nonempty, + &db_rep->mutex)) != 0) + goto err; +#endif + } + if (db_rep->finished) + ret = DB_REP_UNAVAIL; + else { + m = STAILQ_FIRST(&q->header); + STAILQ_REMOVE_HEAD(&q->header, entries); + q->size--; + *msgp = m; + } + +err: + UNLOCK_MUTEX(db_rep->mutex); + return (ret); +} + +/* + * PUBLIC: int __repmgr_queue_put __P((DB_ENV *, REPMGR_MESSAGE *)); + * + * !!! + * Caller must hold repmgr->mutex. + */ +int +__repmgr_queue_put(dbenv, msg) + DB_ENV *dbenv; + REPMGR_MESSAGE *msg; +{ + DB_REP *db_rep; + REPMGR_QUEUE *q; + + db_rep = dbenv->rep_handle; + q = db_rep->input_queue; + + STAILQ_INSERT_TAIL(&q->header, msg, entries); + q->size++; + + return (__repmgr_signal(&db_rep->queue_nonempty)); +} + +/* + * PUBLIC: int __repmgr_queue_size __P((DB_ENV *)); + * + * !!! + * Caller must hold repmgr->mutex. + */ +int +__repmgr_queue_size(dbenv) + DB_ENV *dbenv; +{ + return (dbenv->rep_handle->input_queue->size); +} diff --git a/db/repmgr/repmgr_sel.c b/db/repmgr/repmgr_sel.c new file mode 100644 index 000000000..bbb29b0ee --- /dev/null +++ b/db/repmgr/repmgr_sel.c @@ -0,0 +1,875 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2005 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_sel.c,v 1.25 2006/09/19 14:14:11 mjc Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#include "db_int.h" + +static int __repmgr_connect __P((DB_ENV*, socket_t *, REPMGR_SITE *)); +static int record_ack __P((DB_ENV *, REPMGR_SITE *, DB_REPMGR_ACK *)); +static int dispatch_phase_completion __P((DB_ENV *, REPMGR_CONNECTION *)); +static int notify_handshake __P((DB_ENV *, REPMGR_CONNECTION *)); + +/* + * PUBLIC: void *__repmgr_select_thread __P((void *)); + */ +void * +__repmgr_select_thread(args) + void *args; +{ + DB_ENV *dbenv = args; + int ret; + + if ((ret = __repmgr_select_loop(dbenv)) != 0) { + __db_err(dbenv, ret, "select loop failed"); + __repmgr_thread_failure(dbenv, ret); + } + return (NULL); +} + +/* + * PUBLIC: int __repmgr_accept __P((DB_ENV *)); + */ +int +__repmgr_accept(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + struct sockaddr_in siaddr; + socklen_t addrlen; + socket_t s; + int ret; +#ifdef DB_WIN32 + WSAEVENT event_obj; +#endif +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + db_rep = dbenv->rep_handle; + addrlen = sizeof(siaddr); + if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr, + &addrlen)) == -1) { + /* + * Some errors are innocuous and so should be ignored. MSDN + * Library documents the Windows ones; the Unix ones are + * advocated in Stevens' UNPv1, section 16.6; and Linux + * Application Development, p. 416. + */ + switch (ret = net_errno) { +#ifdef DB_WIN32 + case WSAECONNRESET: + case WSAEWOULDBLOCK: +#else + case EINTR: + case EWOULDBLOCK: + case ECONNABORTED: + case ENETDOWN: +#ifdef EPROTO + case EPROTO: +#endif + case ENOPROTOOPT: + case EHOSTDOWN: +#ifdef ENONET + case ENONET: +#endif + case EHOSTUNREACH: + case EOPNOTSUPP: + case ENETUNREACH: +#endif + RPRINT(dbenv, (dbenv, &mb, + "accept error %d considered innocuous", ret)); + return (0); + default: + __db_err(dbenv, ret, "accept error"); + return (ret); + } + } + RPRINT(dbenv, (dbenv, &mb, "accepted a new connection")); + + if ((ret = __repmgr_set_nonblocking(s)) != 0) { + __db_err(dbenv, ret, "can't set nonblock after accept"); + (void)closesocket(s); + return (ret); + } + +#ifdef DB_WIN32 + if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) { + ret = net_errno; + __db_err(dbenv, ret, "can't create WSA event"); + (void)closesocket(s); + return (ret); + } + if (WSAEventSelect(s, event_obj, FD_READ|FD_CLOSE) == SOCKET_ERROR) { + ret = net_errno; + __db_err(dbenv, ret, "can't set desired event bits"); + (void)WSACloseEvent(event_obj); + (void)closesocket(s); + return (ret); + } +#endif + if ((ret = __repmgr_new_connection(dbenv, &conn, s, 0)) != 0) { +#ifdef DB_WIN32 + (void)WSACloseEvent(event_obj); +#endif + (void)closesocket(s); + return (ret); + } + conn->eid = -1; +#ifdef DB_WIN32 + conn->event_object = event_obj; +#endif + + switch (ret = __repmgr_send_handshake(dbenv, conn)) { + case 0: + return (0); + case DB_REP_UNAVAIL: + return (__repmgr_bust_connection(dbenv, conn, TRUE)); + default: + return (ret); + } +} + +/* + * Initiates connection attempts for any sites on the idle list whose retry + * times have expired. + * + * PUBLIC: int __repmgr_retry_connections __P((DB_ENV *)); + * + * !!! + * Assumes caller holds the mutex. + */ +int +__repmgr_retry_connections(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + REPMGR_RETRY *retry; + repmgr_netaddr_t *addr; + repmgr_timeval_t now; + ADDRINFO *list; + u_int eid; + int ret; + + db_rep = dbenv->rep_handle; + __os_clock(dbenv, &now.tv_sec, &now.tv_usec); + + while (!TAILQ_EMPTY(&db_rep->retries)) { + retry = TAILQ_FIRST(&db_rep->retries); + if (__repmgr_timeval_cmp(&retry->time, &now) > 0) + break; /* since items are in time order */ + + TAILQ_REMOVE(&db_rep->retries, retry, entries); + + eid = retry->eid; + __os_free(dbenv, retry); + + /* + * If have never yet successfully resolved this site's host + * name, try to do so now. + * + * (Throughout all the rest of repmgr, we almost never do any + * sort of blocking operation in the select thread. This is the + * sole exception to that rule. Fortunately, it should rarely + * happen. It only happens for a site that we only learned + * about because it connected to us: not only were we not + * configured to know about it, but we also never got a NEWSITE + * message about it. And even then only after the connection + * fails and we want to retry it from this end.) + */ + addr = &SITE_FROM_EID(eid)->net_addr; + if (ADDR_LIST_FIRST(addr) == NULL) { + if (__repmgr_getaddr(dbenv, + addr->host, addr->port, 0, &list) == 0) { + addr->address_list = list; + (void)ADDR_LIST_FIRST(addr); + } else if ((ret = __repmgr_schedule_connection_attempt( + dbenv, eid, FALSE)) != 0) + return (ret); + else + continue; + } + if ((ret = __repmgr_connect_site(dbenv, eid)) != 0) + return (ret); + } + return (0); +} + +/* + * PUBLIC: int __repmgr_first_try_connections __P((DB_ENV *)); + * + * !!! + * Assumes caller holds the mutex. + */ +int +__repmgr_first_try_connections(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + u_int eid; + int ret; + + db_rep = dbenv->rep_handle; + for (eid=0; eid<db_rep->site_cnt; eid++) { + ADDR_LIST_FIRST(&SITE_FROM_EID(eid)->net_addr); + if ((ret = __repmgr_connect_site(dbenv, eid)) != 0) + return (ret); + } + return (0); +} + +/* + * Tries to establish a connection with the site indicated by the given eid, + * starting with the "current" element of its address list and trying as many + * addresses as necessary until the list is exhausted. + * + * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid)); + */ +int +__repmgr_connect_site(dbenv, eid) + DB_ENV *dbenv; + u_int eid; +{ + DB_REP *db_rep; + REPMGR_SITE *site; + REPMGR_CONNECTION *con; + socket_t s; + u_int32_t flags; + int ret; +#ifdef DB_WIN32 + long desired_event; + WSAEVENT event_obj; +#endif + + db_rep = dbenv->rep_handle; + site = SITE_FROM_EID(eid); + + flags = 0; + switch (ret = __repmgr_connect(dbenv, &s, site)) { + case 0: + flags = 0; +#ifdef DB_WIN32 + desired_event = FD_READ|FD_CLOSE; +#endif + break; + case INPROGRESS: + flags = CONN_CONNECTING; +#ifdef DB_WIN32 + desired_event = FD_CONNECT; +#endif + break; + default: + return ( + __repmgr_schedule_connection_attempt(dbenv, eid, FALSE)); + } + +#ifdef DB_WIN32 + if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) { + ret = net_errno; + __db_err(dbenv, ret, "can't create WSA event"); + (void)closesocket(s); + return (ret); + } + if (WSAEventSelect(s, event_obj, desired_event) == SOCKET_ERROR) { + ret = net_errno; + __db_err(dbenv, ret, "can't set desired event bits"); + (void)WSACloseEvent(event_obj); + (void)closesocket(s); + return (ret); + } +#endif + + if ((ret = __repmgr_new_connection(dbenv, &con, s, flags)) + != 0) { +#ifdef DB_WIN32 + (void)WSACloseEvent(event_obj); +#endif + (void)closesocket(s); + return (ret); + } +#ifdef DB_WIN32 + con->event_object = event_obj; +#endif + + if (flags == 0) { + switch (ret = __repmgr_send_handshake(dbenv, con)) { + case 0: + break; + case DB_REP_UNAVAIL: + return (__repmgr_bust_connection(dbenv, con, TRUE)); + default: + return (ret); + } + } + + con->eid = (int)eid; + + site->ref.conn = con; + site->state = SITE_CONNECTED; + return (0); +} + +static int +__repmgr_connect(dbenv, socket_result, site) + DB_ENV *dbenv; + socket_t *socket_result; + REPMGR_SITE *site; +{ + repmgr_netaddr_t *addr; + ADDRINFO *ai; + socket_t s; + char *why; + int ret; + SITE_STRING_BUFFER buffer; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + /* + * Lint doesn't know about DB_ASSERT, so it can't tell that this + * loop will always get executed at least once, giving 'why' a value. + */ + COMPQUIET(why, ""); + addr = &site->net_addr; + ai = ADDR_LIST_CURRENT(addr); + DB_ASSERT(dbenv, ai != NULL); + for (; ai != NULL; ai = ADDR_LIST_NEXT(addr)) { + + if ((s = socket(ai->ai_family, + ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) { + why = "can't create socket to connect"; + continue; + } + + if ((ret = __repmgr_set_nonblocking(s)) != 0) { + __db_err(dbenv, + ret, "can't make nonblock socket to connect"); + (void)closesocket(s); + return (ret); + } + + if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) + ret = net_errno; + + if (ret == 0 || ret == INPROGRESS) { + *socket_result = s; + RPRINT(dbenv, (dbenv, &mb, + "init connection to %s with result %d", + __repmgr_format_site_loc(site, buffer), ret)); + return (ret); + } + + why = "connection failed"; + (void)closesocket(s); + } + + /* We've exhausted all possible addresses. */ + ret = net_errno; + __db_err(dbenv, ret, "%s to %s", why, + __repmgr_format_site_loc(site, buffer)); + return (ret); +} + +/* + * PUBLIC: int __repmgr_send_handshake __P((DB_ENV *, REPMGR_CONNECTION *)); + */ +int +__repmgr_send_handshake(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +{ + DB_REP *db_rep; + REP *rep; + repmgr_netaddr_t *my_addr; + DB_REPMGR_HANDSHAKE buffer; + DBT cntrl, rec; + + db_rep = dbenv->rep_handle; + rep = db_rep->region; + my_addr = &db_rep->my_addr; + + /* + * Remind us that we need to fix priority on the rep API not to be an + * int, if we're going to pass it across the wire. + */ + DB_ASSERT(dbenv, sizeof(u_int32_t) >= sizeof(int)); + + buffer.version = DB_REPMGR_VERSION; + /* TODO: using network byte order is pointless if we only do it here. */ + buffer.priority = htonl((u_int32_t)rep->priority); + buffer.port = my_addr->port; + cntrl.data = &buffer; + cntrl.size = sizeof(buffer); + + DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); + + return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec)); +} + +/* + * PUBLIC: int __repmgr_read_from_site __P((DB_ENV *, REPMGR_CONNECTION *)); + * + * !!! + * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here. + */ +int +__repmgr_read_from_site(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +{ + SITE_STRING_BUFFER buffer; + size_t nr; + int ret; + + /* + * Keep reading pieces as long as we're making some progress, or until + * we complete the current read phase. + */ + for (;;) { + if ((ret = __repmgr_readv(conn->fd, + &conn->iovecs.vectors[conn->iovecs.offset], + conn->iovecs.count - conn->iovecs.offset, &nr)) != 0) { + switch (ret) { +#ifndef DB_WIN32 + case EINTR: + continue; +#endif + case WOULDBLOCK: + return (0); + default: + (void)__repmgr_format_eid_loc(dbenv->rep_handle, + conn->eid, buffer); + __db_err(dbenv, ret, + "can't read from %s", buffer); + return (DB_REP_UNAVAIL); + } + } + + if (nr > 0) { + if (__repmgr_update_consumed(&conn->iovecs, nr)) + return (dispatch_phase_completion(dbenv, + conn)); + } else { + (void)__repmgr_format_eid_loc(dbenv->rep_handle, + conn->eid, buffer); + __db_errx(dbenv, "EOF on connection from %s", buffer); + return (DB_REP_UNAVAIL); + } + } +} + +/* + * Handles whatever needs to be done upon the completion of a reading phase on a + * given connection. + */ +static int +dispatch_phase_completion(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +{ +#define MEM_ALIGN sizeof(double) + DB_REP *db_rep; + DB_REPMGR_ACK *ack; + REPMGR_SITE *site; + DB_REPMGR_HANDSHAKE *handshake; + REPMGR_RETRY *retry; + repmgr_netaddr_t addr; + DBT *dbt; + u_int32_t control_size, rec_size; + size_t memsize, control_offset, rec_offset; + void *membase; + char *host; + u_int port; + int ret, eid; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + db_rep = dbenv->rep_handle; + switch (conn->reading_phase) { + case SIZES_PHASE: + /* + * We've received the header: a message type and the lengths of + * the two pieces of the message. Set up buffers to read the + * two pieces. + */ + + if (conn->msg_type != REPMGR_HANDSHAKE && + !IS_VALID_EID(conn->eid)) { + __db_errx(dbenv, + "expected handshake as first msg from passively connected site"); + return (DB_REP_UNAVAIL); + } + + __repmgr_iovec_init(&conn->iovecs); + control_size = ntohl(conn->control_size_buf); + rec_size = ntohl(conn->rec_size_buf); + if (conn->msg_type == REPMGR_REP_MESSAGE) { + /* + * Allocate a block of memory large enough to hold a + * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT + * data areas that it points to. Start by calculating + * the total memory needed, rounding up for the start of + * each DBT, to ensure possible alignment requirements. + */ + /* + * TODO: Keith says we don't need to mess with this: put + * the burden on base replication code. + */ + memsize = (size_t) + DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN); + control_offset = memsize; + memsize += control_size; + if (rec_size > 0) { + memsize = (size_t)DB_ALIGN(memsize, MEM_ALIGN); + rec_offset = memsize; + memsize += rec_size; + } else + COMPQUIET(rec_offset, 0); + if ((ret = __os_malloc(dbenv, memsize, &membase)) != 0) + return (ret); + conn->input.rep_message = membase; + memset(&conn->input.rep_message->control, 0, + sizeof(DBT)); + memset(&conn->input.rep_message->rec, 0, sizeof(DBT)); + conn->input.rep_message->originating_eid = conn->eid; + conn->input.rep_message->control.size = control_size; + conn->input.rep_message->control.data = + (u_int8_t*)membase + control_offset; + __repmgr_add_buffer(&conn->iovecs, + conn->input.rep_message->control.data, + control_size); + + conn->input.rep_message->rec.size = rec_size; + if (rec_size > 0) { + conn->input.rep_message->rec.data = + (u_int8_t*)membase + rec_offset; + __repmgr_add_buffer(&conn->iovecs, + conn->input.rep_message->rec.data, + rec_size); + } else + conn->input.rep_message->rec.data = NULL; + + } else { + if (control_size == 0) { + __db_errx( + dbenv, "illegal size for non-rep msg"); + return (DB_REP_UNAVAIL); + } + conn->input.repmgr_msg.cntrl.size = control_size; + conn->input.repmgr_msg.rec.size = rec_size; + + /* + * TODO: consider allocating space for ack's just once + * (lazily?), or even providing static space for it in + * the conn structure itself, thus avoiding a bit of + * thrashing in the memory pool. If we do that, then of + * course we must get rid of the corresponding call to + * free(), below. + */ + dbt = &conn->input.repmgr_msg.cntrl; + dbt->size = control_size; + if ((ret = __os_malloc(dbenv, control_size, + &dbt->data)) != 0) + return (ret); + __repmgr_add_dbt(&conn->iovecs, dbt); + + dbt = &conn->input.repmgr_msg.rec; + if ((dbt->size = rec_size) > 0) { + if ((ret = __os_malloc(dbenv, rec_size, + &dbt->data)) != 0) { + __os_free(dbenv, + conn->input.repmgr_msg.cntrl.data); + return (ret); + } + __repmgr_add_dbt(&conn->iovecs, dbt); + } + } + + conn->reading_phase = DATA_PHASE; + break; + + case DATA_PHASE: + /* + * We have a complete message, so process it. Acks and + * handshakes get processed here, in line. Regular rep messages + * get posted to a queue, to be handled by a thread from the + * message thread pool. + */ + switch (conn->msg_type) { + case REPMGR_ACK: + /* + * Extract the LSN. Save it only if it is an + * improvement over what the site has already ack'ed. + */ + ack = conn->input.repmgr_msg.cntrl.data; + if (conn->input.repmgr_msg.cntrl.size != sizeof(*ack) || + conn->input.repmgr_msg.rec.size != 0) { + __db_errx(dbenv, "bad ack msg size"); + return (DB_REP_UNAVAIL); + } + if ((ret = record_ack(dbenv, SITE_FROM_EID(conn->eid), + ack)) != 0) + return (ret); + __os_free(dbenv, conn->input.repmgr_msg.cntrl.data); + break; + + case REPMGR_HANDSHAKE: + handshake = conn->input.repmgr_msg.cntrl.data; + if (conn->input.repmgr_msg.cntrl.size >= + sizeof(handshake->version) && + handshake->version != DB_REPMGR_VERSION) { + __db_errx(dbenv, + "mismatched repmgr message protocol version (%lu)", + (u_long)handshake->version); + return (DB_REP_UNAVAIL); + } + if (conn->input.repmgr_msg.cntrl.size != + sizeof(*handshake) || + conn->input.repmgr_msg.rec.size == 0) { + __db_errx(dbenv, "bad handshake msg size"); + return (DB_REP_UNAVAIL); + } + + port = handshake->port; + host = conn->input.repmgr_msg.rec.data; + host[conn->input.repmgr_msg.rec.size-1] = '\0'; + + RPRINT(dbenv, (dbenv, &mb, + "got handshake %s:%u, pri %lu", host, port, + (u_long)ntohl(handshake->priority))); + + if (IS_VALID_EID(conn->eid)) { + /* + * We must have initiated this as an outgoing + * connection, since we already know the EID. + * All we need from the handshake is the + * priority. + */ + site = SITE_FROM_EID(conn->eid); + RPRINT(dbenv, (dbenv, &mb, + "handshake from connection to %s:%lu", + site->net_addr.host, + (u_long)site->net_addr.port)); + } else { + if (IS_VALID_EID(eid = + __repmgr_find_site(dbenv, host, port))) { + site = SITE_FROM_EID(eid); + if (site->state == SITE_IDLE) { + RPRINT(dbenv, (dbenv, &mb, + "handshake from previously idle site")); + retry = site->ref.retry; + TAILQ_REMOVE(&db_rep->retries, + retry, entries); + __os_free(dbenv, retry); + + conn->eid = eid; + site->state = SITE_CONNECTED; + site->ref.conn = conn; + } else { + /* + * We got an incoming connection + * for a site we were already + * connected to; discard it. + */ + __db_errx(dbenv, + "redundant incoming connection will be ignored"); + return (DB_REP_UNAVAIL); + } + } else { + RPRINT(dbenv, (dbenv, &mb, + "handshake introduces unknown site")); + if ((ret = __repmgr_pack_netaddr( + dbenv, host, port, NULL, + &addr)) != 0) + return (ret); + if ((ret = __repmgr_new_site( + dbenv, &site, &addr, + SITE_CONNECTED)) != 0) { + __repmgr_cleanup_netaddr(dbenv, + &addr); + return (ret); + } + conn->eid = EID_FROM_SITE(site); + site->ref.conn = conn; + } + } + + /* TODO: change priority to be u_int32_t. */ + DB_ASSERT(dbenv, sizeof(int) == sizeof(u_int32_t)); + site->priority = (int)ntohl(handshake->priority); + + if ((ret = notify_handshake(dbenv, conn)) != 0) + return (ret); + + __os_free(dbenv, conn->input.repmgr_msg.cntrl.data); + __os_free(dbenv, conn->input.repmgr_msg.rec.data); + break; + + case REPMGR_REP_MESSAGE: + if ((ret = __repmgr_queue_put(dbenv, + conn->input.rep_message)) != 0) + return (ret); + /* + * The queue has taken over responsibility for the + * rep_message buffer, and will free it later. + */ + break; + + default: + __db_errx(dbenv, "unknown msg type rcvd: %d", + (int)conn->msg_type); + return (DB_REP_UNAVAIL); + } + + __repmgr_reset_for_reading(conn); + break; + + default: + DB_ASSERT(dbenv, FALSE); + } + + return (0); +} + +/* + * Performs any processing needed upon the receipt of a handshake. + * + * !!! + * Caller must hold mutex. + */ +static int +notify_handshake(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +{ + DB_REP *db_rep; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; +#endif + + COMPQUIET(conn, NULL); + + db_rep = dbenv->rep_handle; + /* + * If we're moping around wishing we knew who the master was, then + * getting in touch with another site might finally provide sufficient + * connectivity to find out. But just do this once, because otherwise + * we get messages while the subsequent rep_start operations are going + * on, and rep tosses them in that case. (TODO: this may need further + * refinement.) + */ + if (db_rep->master_eid == DB_EID_INVALID && !db_rep->done_one) { + db_rep->done_one = TRUE; + RPRINT(dbenv, (dbenv, &mb, + "handshake with no known master to wake election thread")); + return (__repmgr_init_election(dbenv, ELECT_REPSTART)); + } + return (0); +} + +static int +record_ack(dbenv, site, ack) + DB_ENV *dbenv; + REPMGR_SITE *site; + DB_REPMGR_ACK *ack; +{ + DB_REP *db_rep; + int ret; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; + SITE_STRING_BUFFER buffer; +#endif + + db_rep = dbenv->rep_handle; + + /* Ignore stale acks. */ + if (ack->generation < db_rep->generation) { + RPRINT(dbenv, (dbenv, &mb, + "ignoring stale ack (%lu<%lu), from %s", + (u_long)ack->generation, (u_long)db_rep->generation, + __repmgr_format_site_loc(site, buffer))); + return (0); + } + RPRINT(dbenv, (dbenv, &mb, + "got ack [%lu][%lu](%lu) from %s", (u_long)ack->lsn.file, + (u_long)ack->lsn.offset, (u_long)ack->generation, + __repmgr_format_site_loc(site, buffer))); + + /* + * TODO: what about future ones? Ideally, you'd like to wake up any + * waiting send() threads and have them return DB_REP_OUTDATED or + * something. But a mechanism to do that would be messy, and it almost + * seems not worth it, since (1) this almost can't happen; and (2) if we + * just ignore it, eventually the send() calls will time out (or not), + * and as long as we don't mistakenly ack something. The only advantage + * to doing something is more timely failure notification to the + * application, in (what I think is) an extremely rare situation. + */ + if (ack->generation == db_rep->generation && + log_compare(&ack->lsn, &site->max_ack) == 1) { + memcpy(&site->max_ack, &ack->lsn, sizeof(DB_LSN)); + if ((ret = __repmgr_wake_waiting_senders(dbenv)) != 0) + return (ret); + } + return (0); +} + +/* + * PUBLIC: int __repmgr_write_some __P((DB_ENV *, REPMGR_CONNECTION *)); + */ +int +__repmgr_write_some(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +{ + REPMGR_FLAT *msg; + QUEUED_OUTPUT *output; + int bytes, ret; + + while (!STAILQ_EMPTY(&conn->outbound_queue)) { + output = STAILQ_FIRST(&conn->outbound_queue); + msg = output->msg; + if ((bytes = send(conn->fd, &msg->data[output->offset], + (size_t)msg->length - output->offset, 0)) == SOCKET_ERROR) { + if ((ret = net_errno) == WOULDBLOCK) + return (0); + else { + __db_err(dbenv, ret, "writing data"); + return (DB_REP_UNAVAIL); + } + } + + if ((output->offset += (size_t)bytes) >= msg->length) { + STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries); + __os_free(dbenv, output); + conn->out_queue_length--; + if (--msg->ref_count <= 0) + __os_free(dbenv, msg); + } + } + +#ifdef DB_WIN32 + /* + * With the queue now empty, it's time to relinquish ownership of this + * connection again, so that the next call to send() can write the + * message in line, instead of posting it to the queue for us. + */ + if (WSAEventSelect(conn->fd, conn->event_object, FD_READ|FD_CLOSE) + == SOCKET_ERROR) { + ret = net_errno; + __db_err(dbenv, ret, "can't remove FD_WRITE event bit"); + return (ret); + } +#endif + + return (0); +} diff --git a/db/repmgr/repmgr_stat.c b/db/repmgr/repmgr_stat.c new file mode 100644 index 000000000..cbd7113ee --- /dev/null +++ b/db/repmgr/repmgr_stat.c @@ -0,0 +1,116 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2005-2006 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_stat.c,v 1.28 2006/09/08 19:22:42 bostic Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#include "db_int.h" + +/* + * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **)); + */ +int +__repmgr_site_list(dbenv, countp, listp) + DB_ENV *dbenv; + u_int *countp; + DB_REPMGR_SITE **listp; +{ + DB_REP *db_rep; + DB_REPMGR_SITE *status; + REPMGR_SITE *site; + size_t array_size, total_size; + u_int count, i; + int locked, ret; + char *name; + + db_rep = dbenv->rep_handle; + if (REPMGR_SYNC_INITED(db_rep)) { + LOCK_MUTEX(db_rep->mutex); + locked = TRUE; + } else + locked = FALSE; + + /* Initialize for empty list or error return. */ + ret = 0; + *countp = 0; + *listp = NULL; + + /* First, add up how much memory we need for the host names. */ + if ((count = db_rep->site_cnt) == 0) + goto err; + + array_size = sizeof(DB_REPMGR_SITE) * count; + total_size = array_size; + for (i = 0; i < count; i++) { + site = &db_rep->sites[i]; + + /* Make room for the NUL terminating byte. */ + total_size += strlen(site->net_addr.host) + 1; + } + + if ((ret = __os_umalloc(dbenv, total_size, &status)) != 0) + goto err; + + /* + * Put the storage for the host names after the array of structs. This + * way, the caller can free the whole thing in one single operation. + */ + name = (char *)((u_int8_t *)status + array_size); + for (i = 0; i < count; i++) { + site = &db_rep->sites[i]; + + status[i].eid = EID_FROM_SITE(site); + + status[i].host = name; + (void)strcpy(name, site->net_addr.host); + name += strlen(name) + 1; + + status[i].port = site->net_addr.port; + status[i].status = site->state == SITE_CONNECTED ? + DB_REPMGR_CONNECTED : DB_REPMGR_DISCONNECTED; + } + + *countp = count; + *listp = status; + +err: if (locked) + UNLOCK_MUTEX(db_rep->mutex); + return (ret); +} + +/* + * PUBLIC: int __repmgr_print_stats __P((DB_ENV *)); + */ +int +__repmgr_print_stats(dbenv) + DB_ENV *dbenv; +{ + DB_REPMGR_SITE *list; + u_int count, i; + int ret; + + if ((ret = __repmgr_site_list(dbenv, &count, &list)) != 0) + return (ret); + + if (count == 0) + return (0); + + __db_msg(dbenv, "%s", DB_GLOBAL(db_line)); + __db_msg(dbenv, "DB_REPMGR site information:"); + + for (i = 0; i < count; ++i) { + __db_msg(dbenv, "%s (eid: %d, port: %u, %sconnected)", + list[i].host, list[i].eid, list[i].port, + list[i].status == DB_REPMGR_CONNECTED ? "" : "dis"); + } + + __os_ufree(dbenv, list); + + return (0); +} diff --git a/db/repmgr/repmgr_util.c b/db/repmgr/repmgr_util.c new file mode 100644 index 000000000..541308384 --- /dev/null +++ b/db/repmgr/repmgr_util.c @@ -0,0 +1,415 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2005-2006 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_util.c,v 1.27 2006/09/19 14:14:12 mjc Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#include "db_int.h" + +/* + * Schedules a future attempt to re-establish a connection with the given site. + * Usually, we wait the configured retry_wait period. But if the "immediate" + * parameter is given as TRUE, we'll make the wait time 0, and put the request + * at the _beginning_ of the retry queue. Note how this allows us to preserve + * the property that the queue stays in time order simply by appending to the + * end. + * + * PUBLIC: int __repmgr_schedule_connection_attempt __P((DB_ENV *, u_int, int)); + * + * !!! + * Caller should hold mutex. + * + * Unless an error occurs, we always attempt to wake the main thread; + * __repmgr_bust_connection relies on this behavior. + */ +int +__repmgr_schedule_connection_attempt(dbenv, eid, immediate) + DB_ENV *dbenv; + u_int eid; + int immediate; +{ + DB_REP *db_rep; + REPMGR_SITE *site; + REPMGR_RETRY *retry; + repmgr_timeval_t t; + int ret; + + db_rep = dbenv->rep_handle; + if ((ret = __os_malloc(dbenv, sizeof(*retry), &retry)) != 0) + return (ret); + + __os_clock(dbenv, &t.tv_sec, &t.tv_usec); + if (immediate) + TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries); + else { + if ((t.tv_usec += db_rep->connection_retry_wait%1000000) > + 1000000) { + t.tv_sec++; + t.tv_usec -= 1000000; + } + t.tv_sec += db_rep->connection_retry_wait/1000000; + TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries); + } + retry->eid = eid; + memcpy(&retry->time, &t, sizeof(repmgr_timeval_t)); + + site = SITE_FROM_EID(eid); + site->state = SITE_IDLE; + site->ref.retry = retry; + + return (__repmgr_wake_main_thread(dbenv)); +} + +/* + * Initialize the necessary control structures to begin reading a new input + * message. + * + * PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *)); + */ +/* + * TODO: we need to make sure we call this: (1) initially, when we first create + * a connection; (2) after processing a message, to get ready to read the next + * one; and (3) after a connection gets successfully re-established after a + * failure. (Actually, #1 and #3 should probably end up being the same thing, + * if the code is organized properly.) + */ +void +__repmgr_reset_for_reading(con) + REPMGR_CONNECTION *con; +{ + con->reading_phase = SIZES_PHASE; + __repmgr_iovec_init(&con->iovecs); + __repmgr_add_buffer(&con->iovecs, &con->msg_type, + sizeof(con->msg_type)); + __repmgr_add_buffer(&con->iovecs, &con->control_size_buf, + sizeof(con->control_size_buf)); + __repmgr_add_buffer(&con->iovecs, &con->rec_size_buf, + sizeof(con->rec_size_buf)); +} + +/* + * Constructs a DB_REPMGR_CONNECTION structure, and puts it on the main list of + * connections. It does not initialize eid, since that isn't needed and/or + * immediately known in all cases. + * + * PUBLIC: int __repmgr_new_connection __P((DB_ENV *, REPMGR_CONNECTION **, + * PUBLIC: socket_t, u_int32_t)); + */ +int +__repmgr_new_connection(dbenv, connp, s, flags) + DB_ENV *dbenv; + REPMGR_CONNECTION **connp; + socket_t s; + u_int32_t flags; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *c; + int ret; + + db_rep = dbenv->rep_handle; + if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0) + return (ret); + + c->fd = s; + c->flags = flags; + + STAILQ_INIT(&c->outbound_queue); + c->out_queue_length = 0; + + __repmgr_reset_for_reading(c); + TAILQ_INSERT_TAIL(&db_rep->connections, c, entries); + *connp = c; + + return (0); +} + +/* + * PUBLIC: int __repmgr_new_site __P((DB_ENV *, REPMGR_SITE**, + * PUBLIC: const repmgr_netaddr_t *, int)); + * + * !!! + * Caller must hold mutex. + */ +int +__repmgr_new_site(dbenv, sitep, addr, state) + DB_ENV *dbenv; + REPMGR_SITE **sitep; + const repmgr_netaddr_t *addr; + int state; +{ + DB_REP *db_rep; + REPMGR_SITE *site; + u_int new_site_max, eid; + int ret; +#ifdef DIAGNOSTIC + DB_MSGBUF mb; + SITE_STRING_BUFFER buffer; +#endif + + db_rep = dbenv->rep_handle; + if (db_rep->site_cnt >= db_rep->site_max) { +#define INITIAL_SITES_ALLOCATION 10 /* Arbitrary guess. */ + new_site_max = db_rep->site_max == 0 ? + INITIAL_SITES_ALLOCATION : db_rep->site_max * 2; + if ((ret = __os_realloc(dbenv, + sizeof(REPMGR_SITE) * new_site_max, &db_rep->sites)) != 0) + return (ret); + db_rep->site_max = new_site_max; + } + eid = db_rep->site_cnt++; + + site = &db_rep->sites[eid]; + + memcpy(&site->net_addr, addr, sizeof(*addr)); + ZERO_LSN(site->max_ack); + site->priority = -1; /* OOB value indicates we don't yet know. */ + site->state = state; + + RPRINT(dbenv, (dbenv, &mb, "EID %u is assigned for %s", eid, + __repmgr_format_site_loc(site, buffer))); + *sitep = site; + return (0); +} + +/* + * Destructor for a repmgr_netaddr_t, cleans up any allocated memory pointed to + * by the addr. + * + * PUBLIC: void __repmgr_cleanup_netaddr __P((DB_ENV *, repmgr_netaddr_t *)); + */ +void +__repmgr_cleanup_netaddr(dbenv, addr) + DB_ENV *dbenv; + repmgr_netaddr_t *addr; +{ + if (addr->address_list != NULL) { + __db_freeaddrinfo(dbenv, addr->address_list); + addr->address_list = addr->current = NULL; + } + if (addr->host != NULL) { + __os_free(dbenv, addr->host); + addr->host = NULL; + } +} + +/* + * PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *)); + */ +void +__repmgr_iovec_init(v) + REPMGR_IOVECS *v; +{ + v->offset = v->count = 0; + v->total_bytes = 0; +} + +/* + * PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t)); + * + * !!! + * There is no checking for overflow of the vectors[5] array. + */ +void +__repmgr_add_buffer(v, address, length) + REPMGR_IOVECS *v; + void *address; + size_t length; +{ + v->vectors[v->count].iov_base = address; + v->vectors[v->count++].iov_len = length; + v->total_bytes += length; +} + +/* + * PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *)); + */ +void +__repmgr_add_dbt(v, dbt) + REPMGR_IOVECS *v; + const DBT *dbt; +{ + v->vectors[v->count].iov_base = dbt->data; + v->vectors[v->count++].iov_len = dbt->size; + v->total_bytes += dbt->size; +} + +/* + * Update a set of iovecs to reflect the number of bytes transferred in an I/O + * operation, so that the iovecs can be used to continue transferring where we + * left off. + * Returns TRUE if the set of buffers is now fully consumed, FALSE if more + * remains. + * + * PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t)); + */ +int +__repmgr_update_consumed(v, byte_count) + REPMGR_IOVECS *v; + size_t byte_count; +{ + db_iovec_t *iov; + int i; + + for (i = v->offset; ; i++) { + DB_ASSERT(NULL, i < v->count && byte_count > 0); + iov = &v->vectors[i]; + if (byte_count > iov->iov_len) { + /* + * We've consumed (more than) this vector's worth. + * Adjust count and continue. + */ + byte_count -= iov->iov_len; + } else { + /* Adjust length of remaining portion of vector. */ + iov->iov_len -= byte_count; + if (iov->iov_len > 0) { + /* + * Still some left in this vector. Adjust base + * address too, and leave offset pointing here. + */ + iov->iov_base = (void *) + ((u_int8_t *)iov->iov_base + byte_count); + v->offset = i; + } else { + /* + * Consumed exactly to a vector boundary. + * Advance to next vector for next time. + */ + v->offset = i+1; + } + /* + * If offset has reached count, the entire thing is + * consumed. + */ + return (v->offset >= v->count); + } + } +} + +/* + * Builds a buffer containing our network address information, suitable for + * publishing as cdata via a call to rep_start, and sets up the given DBT to + * point to it. The buffer is dynamically allocated memory, and the caller must + * assume responsibility for it. + * + * PUBLIC: int __repmgr_prepare_my_addr __P((DB_ENV *, DBT *)); + */ +int +__repmgr_prepare_my_addr(dbenv, dbt) + DB_ENV *dbenv; + DBT *dbt; +{ + DB_REP *db_rep; + size_t size, hlen; + u_int16_t port_buffer; + u_int8_t *ptr; + int ret; + + db_rep = dbenv->rep_handle; + + /* + * The cdata message consists of the 2-byte port number, in network byte + * order, followed by the null-terminated host name string. + */ + port_buffer = htons(db_rep->my_addr.port); + size = sizeof(port_buffer) + + (hlen = strlen(db_rep->my_addr.host) + 1); + if ((ret = __os_malloc(dbenv, size, &ptr)) != 0) + return (ret); + + DB_INIT_DBT(*dbt, ptr, size); + + memcpy(ptr, &port_buffer, sizeof(port_buffer)); + ptr = &ptr[sizeof(port_buffer)]; + memcpy(ptr, db_rep->my_addr.host, hlen); + + return (0); +} + +/* + * PUBLIC: int __repmgr_timeval_cmp + * PUBLIC: __P((repmgr_timeval_t *, repmgr_timeval_t *)); + */ +int +__repmgr_timeval_cmp(a, b) + repmgr_timeval_t *a, *b; +{ + if (a->tv_sec == b->tv_sec) { + if (a->tv_usec == b->tv_usec) + return (0); + else + return (a->tv_usec < b->tv_usec ? -1 : 1); + } else + return (a->tv_sec < b->tv_sec ? -1 : 1); +} + +/* + * Provide the appropriate value for nsites, the number of sites in the + * replication group. If the application has specified a value, use that. + * Otherwise, just use the number of sites we know of. + * + * PUBLIC: u_int __repmgr_get_nsites __P((DB_REP *)); + */ +u_int +__repmgr_get_nsites(db_rep) + DB_REP *db_rep; +{ + if (db_rep->config_nsites > 0) + return ((u_int)db_rep->config_nsites); + + /* + * The number of other sites in our table, plus 1 to count ourself. + */ + return (db_rep->site_cnt + 1); +} + +/* + * PUBLIC: void __repmgr_thread_failure __P((DB_ENV *, int)); + */ +void +__repmgr_thread_failure(dbenv, why) + DB_ENV *dbenv; + int why; +{ + (void)__repmgr_stop_threads(dbenv); + (void)__db_panic(dbenv, why); +} + +/* + * Format a printable representation of a site location, suitable for inclusion + * in an error message. The buffer must be at least as big as + * MAX_SITE_LOC_STRING. + * + * PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *, int, char *)); + */ +char * +__repmgr_format_eid_loc(db_rep, eid, buffer) + DB_REP *db_rep; + int eid; + char *buffer; +{ + if (IS_VALID_EID(eid)) + return (__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer)); + + snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)"); + return (buffer); +} + +/* + * PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *)); + */ +char * +__repmgr_format_site_loc(site, buffer) + REPMGR_SITE *site; + char *buffer; +{ + snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu", + site->net_addr.host, (u_long)site->net_addr.port); + return (buffer); +} diff --git a/db/repmgr/repmgr_windows.c b/db/repmgr/repmgr_windows.c new file mode 100644 index 000000000..20c290665 --- /dev/null +++ b/db/repmgr/repmgr_windows.c @@ -0,0 +1,708 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2005-2006 + * Oracle Corporation. All rights reserved. + * + * $Id: repmgr_windows.c,v 1.14 2006/09/11 15:15:20 bostic Exp $ + */ + +#include "db_config.h" + +#define __INCLUDE_NETWORKING 1 +#include "db_int.h" + +typedef struct __ack_waiter { + HANDLE event; + const DB_LSN *lsnp; + struct __ack_waiter *next_free; +} ACK_WAITER; + +#define WAITER_SLOT_IN_USE(w) ((w)->lsnp != NULL) + +/* + * Array slots [0:next_avail-1] are initialized, and either in use or on the + * free list. Slots beyond that are virgin territory, whose memory contents + * could be garbage. In particular, note that slots [0:next_avail-1] have a + * Win32 Event Object created for them, which have to be freed when cleaning up + * this data structure. + * + * "first_free" points to a list of not-in-use slots threaded through the first + * section of the array. + */ +struct __ack_waiters_table { + struct __ack_waiter *array; + int size; + int next_avail; + struct __ack_waiter *first_free; +}; + +static int allocate_wait_slot __P((DB_ENV *, ACK_WAITER **)); +static void free_wait_slot __P((DB_ENV *, ACK_WAITER *)); +static int handle_completion __P((DB_ENV *, REPMGR_CONNECTION *)); +static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *, + LPWSANETWORKEVENTS)); + +int +__repmgr_thread_start(dbenv, runnable) + DB_ENV *dbenv; + REPMGR_RUNNABLE *runnable; +{ + HANDLE thread_id; + + runnable->finished = FALSE; + + thread_id = CreateThread(NULL, 0, + (LPTHREAD_START_ROUTINE)runnable->run, dbenv, 0, NULL); + if (thread_id == NULL) + return (GetLastError()); + runnable->thread_id = thread_id; + return (0); +} + +int +__repmgr_thread_join(thread) + REPMGR_RUNNABLE *thread; +{ + if (WaitForSingleObject(thread->thread_id, INFINITE) == WAIT_OBJECT_0) + return (0); + return (GetLastError()); +} + +int __repmgr_set_nonblocking(s) + SOCKET s; +{ + int ret; + u_long arg; + + arg = 1; /* any non-zero value */ + if ((ret = ioctlsocket(s, FIONBIO, &arg)) == SOCKET_ERROR) + return (WSAGetLastError()); + return (0); +} + +/* + * Wake any send()-ing threads waiting for an acknowledgement. + * + * !!! + * Caller must hold the repmgr->mutex, if this thread synchronization is to work + * properly. + */ +int +__repmgr_wake_waiting_senders(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + ACK_WAITER *slot; + int i, ret; + + ret = 0; + db_rep = dbenv->rep_handle; + for (i=0; i<db_rep->waiters->next_avail; i++) { + slot = &db_rep->waiters->array[i]; + if (!WAITER_SLOT_IN_USE(slot)) + continue; + if (__repmgr_is_permanent(dbenv, slot->lsnp)) + if (!SetEvent(slot->event) && ret == 0) + ret = GetLastError(); + } + return (ret); +} + +/* + * !!! + * Caller must hold mutex. + */ +int +__repmgr_await_ack(dbenv, lsnp) + DB_ENV *dbenv; + const DB_LSN *lsnp; +{ + DB_REP *db_rep; + ACK_WAITER *me; + DWORD ret; + DWORD timeout; + + db_rep = dbenv->rep_handle; + + if ((ret = allocate_wait_slot(dbenv, &me)) != 0) + goto err; + + /* convert time-out from microseconds to milliseconds, rounding up */ + timeout = db_rep->ack_timeout > 0 ? ((db_rep->ack_timeout+999) / 1000) : + INFINITE; + me->lsnp = lsnp; + if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, + FALSE)) == WAIT_FAILED) { + ret = GetLastError(); + } else if (ret == WAIT_TIMEOUT) + ret = DB_REP_UNAVAIL; + else + DB_ASSERT(dbenv, ret == WAIT_OBJECT_0); + + LOCK_MUTEX(db_rep->mutex); + free_wait_slot(dbenv, me); + +err: + return (ret); +} + +/* + * !!! + * Caller must hold the mutex. + */ +static int +allocate_wait_slot(dbenv, resultp) + DB_ENV *dbenv; + ACK_WAITER **resultp; +{ + DB_REP *db_rep; + ACK_WAITERS_TABLE *table; + ACK_WAITER *w; + int ret; + + db_rep = dbenv->rep_handle; + table = db_rep->waiters; + if (table->first_free == NULL) { + if (table->next_avail >= table->size) { + /* + * Grow the array. + */ + table->size *= 2; + w = table->array; + if ((ret = __os_realloc(dbenv, table->size * sizeof(*w), + &w)) != 0) + return (ret); + table->array = w; + } + /* + * Here if, one way or another, we're good to go for using the + * next slot (for the first time). + */ + w = &table->array[table->next_avail++]; + if ((w->event = CreateEvent(NULL, FALSE, FALSE, NULL)) == + NULL) { + /* + * Maintain the sanctity of our rule that + * [0:next_avail-1] contain valid Event Objects. + */ + --table->next_avail; + return (GetLastError()); + } + } else { + w = table->first_free; + table->first_free = w->next_free; + } + *resultp = w; + return (0); +} + +static void +free_wait_slot(dbenv, slot) + DB_ENV *dbenv; + ACK_WAITER *slot; +{ + DB_REP *db_rep; + + db_rep = dbenv->rep_handle; + + slot->lsnp = NULL; /* show it's not in use */ + slot->next_free = db_rep->waiters->first_free; + db_rep->waiters->first_free = slot; +} + +/* + * Make resource allocation an all-or-nothing affair, outside of this and the + * close_sync function. db_rep->waiters should be non-NULL iff all of these + * resources have been created. + */ +int +__repmgr_init_sync(dbenv, db_rep) + DB_ENV *dbenv; + DB_REP *db_rep; +{ +#define INITIAL_ALLOCATION 5 /* arbitrary size */ + ACK_WAITERS_TABLE *table; + int ret; + + db_rep->signaler = db_rep->queue_nonempty = db_rep->check_election = + db_rep->mutex = NULL; + table = NULL; + + if ((db_rep->signaler = CreateEvent(NULL, /* security attr */ + FALSE, /* (not) of the manual reset variety */ + FALSE, /* (not) initially signaled */ + NULL)) == NULL) /* name */ + goto geterr; + + if ((db_rep->queue_nonempty = CreateEvent(NULL, TRUE, FALSE, NULL)) + == NULL) + goto geterr; + + if ((db_rep->check_election = CreateEvent(NULL, FALSE, FALSE, NULL)) + == NULL) + goto geterr; + + if ((db_rep->mutex = CreateMutex(NULL, FALSE, NULL)) == NULL) + goto geterr; + + if ((ret = __os_calloc(dbenv, 1, sizeof(ACK_WAITERS_TABLE), &table)) + != 0) + goto err; + + if ((ret = __os_calloc(dbenv, INITIAL_ALLOCATION, sizeof(ACK_WAITER), + &table->array)) != 0) + goto err; + + table->size = INITIAL_ALLOCATION; + table->first_free = NULL; + table->next_avail = 0; + + /* There's a restaurant joke in there somewhere. */ + db_rep->waiters = table; + return (0); + +geterr: + ret = GetLastError(); +err: + if (db_rep->check_election != NULL) + CloseHandle(db_rep->check_election); + if (db_rep->queue_nonempty != NULL) + CloseHandle(db_rep->queue_nonempty); + if (db_rep->signaler != NULL) + CloseHandle(db_rep->signaler); + if (db_rep->mutex != NULL) + CloseHandle(db_rep->mutex); + if (table != NULL) + __os_free(dbenv, table); + db_rep->waiters = NULL; + return (ret); +} + +int +__repmgr_close_sync(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + int i, ret; + + db_rep = dbenv->rep_handle; + if (!(REPMGR_SYNC_INITED(db_rep))) + return (0); + + ret = 0; + for (i = 0; i < db_rep->waiters->next_avail; i++) { + if (!CloseHandle(db_rep->waiters->array[i].event) && ret == 0) + ret = GetLastError(); + } + __os_free(dbenv, db_rep->waiters->array); + __os_free(dbenv, db_rep->waiters); + + if (!CloseHandle(db_rep->check_election) && ret == 0) + ret = GetLastError(); + + if (!CloseHandle(db_rep->queue_nonempty) && ret == 0) + ret = GetLastError(); + + if (!CloseHandle(db_rep->signaler) && ret == 0) + ret = GetLastError(); + + if (!CloseHandle(db_rep->mutex) && ret == 0) + ret = GetLastError(); + + db_rep->waiters = NULL; + return (ret); +} + +/* + * Performs net-related resource initialization other than memory initialization + * and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing" + * sentinel signifying that these resources are allocated (except that now the + * new wsa_inited flag may be used to indicate that WSAStartup has already been + * called). + */ +int +__repmgr_net_init(dbenv, db_rep) + DB_ENV *dbenv; + DB_REP *db_rep; +{ + int ret; + + /* Initialize the Windows sockets DLL. */ + if (!db_rep->wsa_inited && (ret = __repmgr_wsa_init(dbenv)) != 0) + goto err; + + if ((ret = __repmgr_listen(dbenv)) == 0) + return (0); + + if (WSACleanup() == SOCKET_ERROR) { + ret = net_errno; + __db_err(dbenv, ret, "WSACleanup"); + } + +err: db_rep->listen_fd = INVALID_SOCKET; + return (ret); +} + +/* + * __repmgr_wsa_init -- + * Initialize the Windows sockets DLL. + * + * PUBLIC: int __repmgr_wsa_init __P((DB_ENV *)); + */ +int +__repmgr_wsa_init(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + WSADATA wsaData; + int ret; + + db_rep = dbenv->rep_handle; + + if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) { + __db_err(dbenv, ret, "unable to initialize Windows networking"); + return (ret); + } + db_rep->wsa_inited = TRUE; + + return (0); +} + +int +__repmgr_lock_mutex(mutex) + mgr_mutex_t *mutex; +{ + if (WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0) + return (0); + return (GetLastError()); +} + +int +__repmgr_unlock_mutex(mutex) + mgr_mutex_t *mutex; +{ + if (ReleaseMutex(*mutex)) + return (0); + return (GetLastError()); +} + +int +__repmgr_signal(v) + cond_var_t *v; +{ + return (SetEvent(*v) ? 0 : GetLastError()); +} + +int +__repmgr_wake_main_thread(dbenv) + DB_ENV *dbenv; +{ + if (!SetEvent(dbenv->rep_handle->signaler)) + return (GetLastError()); + return (0); +} + +int +__repmgr_writev(fd, iovec, buf_count, byte_count_p) + socket_t fd; + db_iovec_t *iovec; + int buf_count; + size_t *byte_count_p; +{ + DWORD bytes; + + if (WSASend(fd, iovec, + (DWORD)buf_count, &bytes, 0, NULL, NULL) == SOCKET_ERROR) + return (net_errno); + + *byte_count_p = (size_t)bytes; + return (0); +} + +int +__repmgr_readv(fd, iovec, buf_count, xfr_count_p) + socket_t fd; + db_iovec_t *iovec; + int buf_count; + size_t *xfr_count_p; +{ + DWORD bytes, flags; + + flags = 0; + if (WSARecv(fd, iovec, + (DWORD)buf_count, &bytes, &flags, NULL, NULL) == SOCKET_ERROR) + return (net_errno); + + *xfr_count_p = (size_t)bytes; + return (0); +} + +/* + * Calculate the time duration from now til "when", in the form of an integer + * (suitable for WSAWaitForMultipleEvents()), clipping the result at 0 (i.e., + * avoid a negative result). + */ +void +__repmgr_timeval_diff_current(dbenv, when, result) + DB_ENV *dbenv; + repmgr_timeval_t *when; + select_timeout_t *result; +{ + repmgr_timeval_t now; + + __os_clock(dbenv, &now.tv_sec, &now.tv_usec); + if (__repmgr_timeval_cmp(when, &now) <= 0) + *result = 0; + else + *result = (when->tv_sec - now.tv_sec) * MS_PER_SEC + + (when->tv_usec - now.tv_usec) / USEC_PER_MS; +} + +int +__repmgr_select_loop(dbenv) + DB_ENV *dbenv; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn, *next; + REPMGR_RETRY *retry; + select_timeout_t timeout; + WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS]; + REPMGR_CONNECTION *connections[WSA_MAXIMUM_WAIT_EVENTS]; + DWORD nevents, ret; + int flow_control, i; + WSAEVENT listen_event; + WSANETWORKEVENTS net_events; + + db_rep = dbenv->rep_handle; + + if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) { + __db_err( + dbenv, net_errno, "can't create event for listen socket"); + return (net_errno); + } + if (WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) == + SOCKET_ERROR) { + ret = net_errno; + __db_err(dbenv, ret, "can't enable event for listener"); + goto out; + } + + LOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_first_try_connections(dbenv)) != 0) + goto unlock; + flow_control = FALSE; + for (;;) { + /* Start with the two events that we always wait for. */ + events[0] = db_rep->signaler; + events[1] = listen_event; + nevents = 2; + + /* + * Add an event for each surviving socket that we're interested + * in. (For now [until we implement flow control], that's all + * of them, in one form or another.) + * Note that even if we're suffering flow control, we + * nevertheless still read if we haven't even yet gotten a + * handshake. Why? (1) Handshakes are important; and (2) they + * don't hurt anything flow-control-wise. + * Loop just like TAILQ_FOREACH, except that we need to be + * able to unlink a list entry. + */ + for (conn = TAILQ_FIRST(&db_rep->connections); + conn != NULL; + conn = next) { + next = TAILQ_NEXT(conn, entries); + if (F_ISSET(conn, CONN_DEFUNCT)) { + __repmgr_cleanup_connection(dbenv, conn); + continue; + } + if (F_ISSET(conn, CONN_CONNECTING) || + !STAILQ_EMPTY(&conn->outbound_queue) || + (!flow_control || !IS_VALID_EID(conn->eid))) { + events[nevents] = conn->event_object; + connections[nevents++] = conn; + } + } + + /* + * Decide how long to wait based on when it will next be time to + * retry an idle connection. (List items are in order, so we + * only have to examine the first one.) + */ + if (TAILQ_EMPTY(&db_rep->retries)) + timeout = WSA_INFINITE; + else { + retry = TAILQ_FIRST(&db_rep->retries); + + __repmgr_timeval_diff_current( + dbenv, &retry->time, &timeout); + } + + UNLOCK_MUTEX(db_rep->mutex); + ret = WSAWaitForMultipleEvents(nevents, events, FALSE, timeout, + FALSE); + if (db_rep->finished) { + ret = 0; + goto out; + } + LOCK_MUTEX(db_rep->mutex); + + if (ret >= WSA_WAIT_EVENT_0 && + ret < WSA_WAIT_EVENT_0 + nevents) { + switch (i = ret - WSA_WAIT_EVENT_0) { + case 0: + /* Another thread woke us. */ + break; + case 1: + if ((ret = WSAEnumNetworkEvents( + db_rep->listen_fd, listen_event, + &net_events)) == SOCKET_ERROR) { + ret = net_errno; + goto unlock; + } + DB_ASSERT(dbenv, + net_events.lNetworkEvents & FD_ACCEPT); + if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT]) + != 0) + goto unlock; + if ((ret = __repmgr_accept(dbenv)) != 0) + goto unlock; + break; + default: + if ((ret = handle_completion(dbenv, + connections[i])) != 0) + goto unlock; + break; + } + } else if (ret == WSA_WAIT_TIMEOUT) { + if ((ret = __repmgr_retry_connections(dbenv)) != 0) + goto unlock; + } else if (ret == WSA_WAIT_FAILED) { + ret = net_errno; + goto unlock; + } + } + +unlock: + UNLOCK_MUTEX(db_rep->mutex); +out: + if (!CloseHandle(listen_event) && ret == 0) + ret = GetLastError(); + return (ret); +} + +static int +handle_completion(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +{ + int ret; + WSANETWORKEVENTS events; + + if ((ret = WSAEnumNetworkEvents(conn->fd, conn->event_object, &events)) + == SOCKET_ERROR) { + __db_err(dbenv, net_errno, "EnumNetworkEvents"); + ret = DB_REP_UNAVAIL; + goto err; + } + + if (F_ISSET(conn, CONN_CONNECTING)) { + if ((ret = finish_connecting(dbenv, conn, &events)) != 0) + goto err; + } else { /* Check both writing and reading. */ + if (events.lNetworkEvents & FD_CLOSE) { + __db_err(dbenv, + events.iErrorCode[FD_CLOSE_BIT], + "connection closed"); + ret = DB_REP_UNAVAIL; + goto err; + } + + if (events.lNetworkEvents & FD_WRITE) { + if (events.iErrorCode[FD_WRITE_BIT] != 0) { + __db_err(dbenv, + events.iErrorCode[FD_WRITE_BIT], + "error writing"); + ret = DB_REP_UNAVAIL; + goto err; + } else if ((ret = + __repmgr_write_some(dbenv, conn)) != 0) + goto err; + } + + if (events.lNetworkEvents & FD_READ) { + if (events.iErrorCode[FD_READ_BIT] != 0) { + __db_err(dbenv, + events.iErrorCode[FD_READ_BIT], + "error reading"); + ret = DB_REP_UNAVAIL; + goto err; + } else if ((ret = + __repmgr_read_from_site(dbenv, conn)) != 0) + goto err; + } + } + + return (0); + +err: if (ret == DB_REP_UNAVAIL) + return (__repmgr_bust_connection(dbenv, conn, TRUE)); + return (ret); +} + +static int +finish_connecting(dbenv, conn, events) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + LPWSANETWORKEVENTS events; +{ + DB_REP *db_rep; + u_int eid; +/* char reason[100]; */ + int ret/*, t_ret*/; +/* DWORD_PTR values[1]; */ + + if (!(events->lNetworkEvents & FD_CONNECT)) { + /* TODO: Is this even possible? */ + return (0); + } + + F_CLR(conn, CONN_CONNECTING); + + if ((ret = events->iErrorCode[FD_CONNECT_BIT]) != 0) { +/* t_ret = FormatMessage( */ +/* FORMAT_MESSAGE_IGNORE_INSERTS | */ +/* FORMAT_MESSAGE_FROM_SYSTEM | */ +/* FORMAT_MESSAGE_ARGUMENT_ARRAY, */ +/* NULL, ret, 0, (LPTSTR)reason, sizeof(reason), values); */ +/* __db_err(dbenv/\*, ret*\/, "connecting: %s", */ +/* reason); */ +/* LocalFree(reason); */ + __db_err(dbenv, ret, "connecting"); + goto err; + } + + if (WSAEventSelect(conn->fd, conn->event_object, FD_READ | FD_CLOSE) == + SOCKET_ERROR) { + ret = net_errno; + __db_err(dbenv, ret, "setting event bits for reading"); + return (ret); + } + + return (__repmgr_send_handshake(dbenv, conn)); + +err: + db_rep = dbenv->rep_handle; + eid = conn->eid; + DB_ASSERT(dbenv, IS_VALID_EID(eid)); + + if (ADDR_LIST_NEXT(&SITE_FROM_EID(eid)->net_addr) == NULL) + return (DB_REP_UNAVAIL); + + DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); + __repmgr_cleanup_connection(dbenv, conn); + ret = __repmgr_connect_site(dbenv, eid); + DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); + return (ret); +} |