diff options
author | Zhang Qiang <qiang.z.zhang@intel.com> | 2012-05-29 11:25:24 +0800 |
---|---|---|
committer | Zhang Qiang <qiang.z.zhang@intel.com> | 2012-05-29 11:25:24 +0800 |
commit | e776056ea09ba0b6d9505ced6913c9190a12d632 (patch) | |
tree | 092838f2a86042abc586aa5576e36ae6cb47e256 /rep/rep_util.c | |
parent | 2e082c838d2ca750f5daac6dcdabecc22dfd4e46 (diff) | |
download | db4-e776056ea09ba0b6d9505ced6913c9190a12d632.tar.gz db4-e776056ea09ba0b6d9505ced6913c9190a12d632.tar.bz2 db4-e776056ea09ba0b6d9505ced6913c9190a12d632.zip |
updated with Tizen:Base source codes
Diffstat (limited to 'rep/rep_util.c')
-rw-r--r-- | rep/rep_util.c | 2007 |
1 files changed, 0 insertions, 2007 deletions
diff --git a/rep/rep_util.c b/rep/rep_util.c deleted file mode 100644 index 8fbf3a0..0000000 --- a/rep/rep_util.c +++ /dev/null @@ -1,2007 +0,0 @@ -/*- - * See the file LICENSE for redistribution information. - * - * Copyright (c) 2001-2009 Oracle. All rights reserved. - * - * $Id$ - */ - -#include "db_config.h" - -#include "db_int.h" -#include "dbinc/db_page.h" -#include "dbinc/db_am.h" -#include "dbinc/log.h" -#include "dbinc/mp.h" -#include "dbinc/txn.h" - -#ifdef REP_DIAGNOSTIC -#include "dbinc/db_page.h" -#include "dbinc/fop.h" -#include "dbinc/btree.h" -#include "dbinc/hash.h" -#include "dbinc/qam.h" -#endif - -/* - * rep_util.c: - * Miscellaneous replication-related utility functions, including - * those called by other subsystems. - */ -#define TIMESTAMP_CHECK(env, ts, renv) do { \ - if (renv->op_timestamp != 0 && \ - renv->op_timestamp + DB_REGENV_TIMEOUT < ts) { \ - REP_SYSTEM_LOCK(env); \ - F_CLR(renv, DB_REGENV_REPLOCKED); \ - renv->op_timestamp = 0; \ - REP_SYSTEM_UNLOCK(env); \ - } \ -} while (0) - -static int __rep_lockout_int __P((ENV *, REP *, u_int32_t *, u_int32_t, - const char *, u_int32_t)); -static int __rep_newmaster_empty __P((ENV *, int)); -#ifdef REP_DIAGNOSTIC -static void __rep_print_logmsg __P((ENV *, const DBT *, DB_LSN *)); -#endif - -/* - * __rep_bulk_message -- - * This is a wrapper for putting a record into a bulk buffer. Since - * we have different bulk buffers, the caller must hand us the information - * we need to put the record into the correct buffer. All bulk buffers - * are protected by the REP->mtx_clientdb. - * - * PUBLIC: int __rep_bulk_message __P((ENV *, REP_BULK *, REP_THROTTLE *, - * PUBLIC: DB_LSN *, const DBT *, u_int32_t)); - */ -int -__rep_bulk_message(env, bulk, repth, lsn, dbt, flags) - ENV *env; - REP_BULK *bulk; - REP_THROTTLE *repth; - DB_LSN *lsn; - const DBT *dbt; - u_int32_t flags; -{ - DB_REP *db_rep; - REP *rep; - __rep_bulk_args b_args; - size_t len; - int ret; - u_int32_t recsize, typemore; - u_int8_t *p; - - db_rep = env->rep_handle; - rep = db_rep->region; - ret = 0; - - /* - * Figure out the total number of bytes needed for this record. - * !!! The marshalling code includes the given len, but also - * puts its own copy of the dbt->size with the DBT portion of - * the record. Account for that here. - */ - recsize = sizeof(len) + dbt->size + sizeof(DB_LSN) + sizeof(dbt->size); - - /* - * If *this* buffer is actively being transmitted, don't wait, - * just return so that it can be sent as a singleton. - */ - MUTEX_LOCK(env, rep->mtx_clientdb); - if (FLD_ISSET(*(bulk->flagsp), BULK_XMIT)) { - MUTEX_UNLOCK(env, rep->mtx_clientdb); - return (DB_REP_BULKOVF); - } - - /* - * If the record is bigger than the buffer entirely, send the - * current buffer and then return DB_REP_BULKOVF so that this - * record is sent as a singleton. Do we have enough info to - * do that here? XXX - */ - if (recsize > bulk->len) { - RPRINT(env, DB_VERB_REP_MSGS, (env, - "bulk_msg: Record %d (0x%x) larger than entire buffer 0x%x", - recsize, recsize, bulk->len)); - STAT(rep->stat.st_bulk_overflows++); - (void)__rep_send_bulk(env, bulk, flags); - /* - * XXX __rep_send_message... - */ - MUTEX_UNLOCK(env, rep->mtx_clientdb); - return (DB_REP_BULKOVF); - } - /* - * If this record doesn't fit, send the current buffer. - * Sending the buffer will reset the offset, but we will - * drop the mutex while sending so we need to keep checking - * if we're racing. - */ - while (recsize + *(bulk->offp) > bulk->len) { - RPRINT(env, DB_VERB_REP_MSGS, (env, - "bulk_msg: Record %lu (%#lx) doesn't fit. Send %lu (%#lx) now.", - (u_long)recsize, (u_long)recsize, - (u_long)bulk->len, (u_long)bulk->len)); - STAT(rep->stat.st_bulk_fills++); - if ((ret = __rep_send_bulk(env, bulk, flags)) != 0) { - MUTEX_UNLOCK(env, rep->mtx_clientdb); - return (ret); - } - } - - /* - * If we're using throttling, see if we are at the throttling - * limit before we do any more work here, by checking if the - * call to rep_send_throttle changed the repth->type to the - * *_MORE message type. If the throttling code hits the limit - * then we're done here. - */ - if (bulk->type == REP_BULK_LOG) - typemore = REP_LOG_MORE; - else - typemore = REP_PAGE_MORE; - if (repth != NULL) { - if ((ret = __rep_send_throttle(env, - bulk->eid, repth, REP_THROTTLE_ONLY, flags)) != 0) { - MUTEX_UNLOCK(env, rep->mtx_clientdb); - return (ret); - } - if (repth->type == typemore) { - RPRINT(env, DB_VERB_REP_MSGS, (env, - "bulk_msg: Record %lu (0x%lx) hit throttle limit.", - (u_long)recsize, (u_long)recsize)); - MUTEX_UNLOCK(env, rep->mtx_clientdb); - return (ret); - } - } - - /* - * Now we own the buffer, and we know our record fits into it. - * The buffer is structured with the len, LSN and then the record. - * Copy the record into the buffer. Then if we need to, - * send the buffer. - */ - p = bulk->addr + *(bulk->offp); - b_args.len = dbt->size; - b_args.lsn = *lsn; - b_args.bulkdata = *dbt; - /* - * If we're the first record, we need to save the first - * LSN in the bulk structure. - */ - if (*(bulk->offp) == 0) - bulk->lsn = *lsn; - if (rep->version < DB_REPVERSION_47) { - len = 0; - memcpy(p, &dbt->size, sizeof(dbt->size)); - p += sizeof(dbt->size); - memcpy(p, lsn, sizeof(DB_LSN)); - p += sizeof(DB_LSN); - memcpy(p, dbt->data, dbt->size); - p += dbt->size; - } else if ((ret = __rep_bulk_marshal(env, &b_args, p, - bulk->len, &len)) != 0) - goto err; - *(bulk->offp) = (uintptr_t)p + (uintptr_t)len - (uintptr_t)bulk->addr; - STAT(rep->stat.st_bulk_records++); - /* - * Send the buffer if it is a perm record or a force. - */ - if (LF_ISSET(REPCTL_PERM)) { - RPRINT(env, DB_VERB_REP_MSGS, (env, - "bulk_msg: Send buffer after copy due to PERM")); - ret = __rep_send_bulk(env, bulk, flags); - } -err: - MUTEX_UNLOCK(env, rep->mtx_clientdb); - return (ret); - -} - -/* - * __rep_send_bulk -- - * This function transmits the bulk buffer given. It assumes the - * caller holds the REP->mtx_clientdb. We may release it and reacquire - * it during this call. We will return with it held. - * - * PUBLIC: int __rep_send_bulk __P((ENV *, REP_BULK *, u_int32_t)); - */ -int -__rep_send_bulk(env, bulkp, ctlflags) - ENV *env; - REP_BULK *bulkp; - u_int32_t ctlflags; -{ - DBT dbt; - DB_REP *db_rep; - REP *rep; - int ret; - - /* - * If the offset is 0, we're done. There is nothing to send. - */ - if (*(bulkp->offp) == 0) - return (0); - - db_rep = env->rep_handle; - rep = db_rep->region; - - /* - * Set that this buffer is being actively transmitted. - */ - FLD_SET(*(bulkp->flagsp), BULK_XMIT); - DB_INIT_DBT(dbt, bulkp->addr, *(bulkp->offp)); - MUTEX_UNLOCK(env, rep->mtx_clientdb); - RPRINT(env, DB_VERB_REP_MSGS, (env, - "send_bulk: Send %d (0x%x) bulk buffer bytes", dbt.size, dbt.size)); - - /* - * Unlocked the mutex and now send the message. - */ - STAT(rep->stat.st_bulk_transfers++); - if ((ret = __rep_send_message(env, - bulkp->eid, bulkp->type, &bulkp->lsn, &dbt, ctlflags, 0)) != 0) - ret = DB_REP_UNAVAIL; - - MUTEX_LOCK(env, rep->mtx_clientdb); - /* - * Ready the buffer for further records. - */ - *(bulkp->offp) = 0; - FLD_CLR(*(bulkp->flagsp), BULK_XMIT); - return (ret); -} - -/* - * __rep_bulk_alloc -- - * This function allocates and initializes an internal bulk buffer. - * This is used by the master when fulfilling a request for a chunk of - * log records or a bunch of pages. - * - * PUBLIC: int __rep_bulk_alloc __P((ENV *, REP_BULK *, int, uintptr_t *, - * PUBLIC: u_int32_t *, u_int32_t)); - */ -int -__rep_bulk_alloc(env, bulkp, eid, offp, flagsp, type) - ENV *env; - REP_BULK *bulkp; - int eid; - uintptr_t *offp; - u_int32_t *flagsp, type; -{ - int ret; - - memset(bulkp, 0, sizeof(REP_BULK)); - *offp = *flagsp = 0; - bulkp->len = MEGABYTE; - if ((ret = __os_malloc(env, bulkp->len, &bulkp->addr)) != 0) - return (ret); - bulkp->offp = offp; - bulkp->type = type; - bulkp->eid = eid; - bulkp->flagsp = flagsp; - return (ret); -} - -/* - * __rep_bulk_free -- - * This function sends the remainder of the bulk buffer and frees it. - * - * PUBLIC: int __rep_bulk_free __P((ENV *, REP_BULK *, u_int32_t)); - */ -int -__rep_bulk_free(env, bulkp, flags) - ENV *env; - REP_BULK *bulkp; - u_int32_t flags; -{ - DB_REP *db_rep; - int ret; - - db_rep = env->rep_handle; - - MUTEX_LOCK(env, db_rep->region->mtx_clientdb); - ret = __rep_send_bulk(env, bulkp, flags); - MUTEX_UNLOCK(env, db_rep->region->mtx_clientdb); - __os_free(env, bulkp->addr); - return (ret); -} - -/* - * __rep_send_message -- - * This is a wrapper for sending a message. It takes care of constructing - * the control structure and calling the user's specified send function. - * - * PUBLIC: int __rep_send_message __P((ENV *, int, - * PUBLIC: u_int32_t, DB_LSN *, const DBT *, u_int32_t, u_int32_t)); - */ -int -__rep_send_message(env, eid, rtype, lsnp, dbt, ctlflags, repflags) - ENV *env; - int eid; - u_int32_t rtype; - DB_LSN *lsnp; - const DBT *dbt; - u_int32_t ctlflags, repflags; -{ - DBT cdbt, scrap_dbt; - DB_ENV *dbenv; - DB_LOG *dblp; - DB_REP *db_rep; - LOG *lp; - REP *rep; - REP_46_CONTROL cntrl46; - REP_OLD_CONTROL ocntrl; - __rep_control_args cntrl; - db_timespec msg_time; - int ret; - u_int32_t myflags; - u_int8_t buf[__REP_CONTROL_SIZE]; - size_t len; - - dbenv = env->dbenv; - db_rep = env->rep_handle; - rep = db_rep->region; - dblp = env->lg_handle; - lp = dblp->reginfo.primary; - ret = 0; - -#if defined(DEBUG_ROP) || defined(DEBUG_WOP) - if (db_rep->send == NULL) - return (0); -#endif - - /* Set up control structure. */ - memset(&cntrl, 0, sizeof(cntrl)); - memset(&ocntrl, 0, sizeof(ocntrl)); - memset(&cntrl46, 0, sizeof(cntrl46)); - if (lsnp == NULL) - ZERO_LSN(cntrl.lsn); - else - cntrl.lsn = *lsnp; - /* - * Set the rectype based on the version we need to speak. - */ - if (rep->version == DB_REPVERSION) - cntrl.rectype = rtype; - else if (rep->version < DB_REPVERSION) { - cntrl.rectype = __rep_msg_to_old(rep->version, rtype); - RPRINT(env, DB_VERB_REP_MSGS, (env, - "rep_send_msg: rtype %lu to version %lu record %lu.", - (u_long)rtype, (u_long)rep->version, - (u_long)cntrl.rectype)); - if (cntrl.rectype == REP_INVALID) - return (ret); - } else { - __db_errx(env, - "rep_send_message: Unknown rep version %lu, my version %lu", - (u_long)rep->version, (u_long)DB_REPVERSION); - return (__env_panic(env, EINVAL)); - } - cntrl.flags = ctlflags; - cntrl.rep_version = rep->version; - cntrl.log_version = lp->persist.version; - cntrl.gen = rep->gen; - - /* Don't assume the send function will be tolerant of NULL records. */ - if (dbt == NULL) { - memset(&scrap_dbt, 0, sizeof(DBT)); - dbt = &scrap_dbt; - } - - /* - * There are several types of records: commit and checkpoint records - * that affect database durability, regular log records that might - * be buffered on the master before being transmitted, and control - * messages which don't require the guarantees of permanency, but - * should not be buffered. - * - * There are request records that can be sent anywhere, and there - * are rerequest records that the app might want to send to the master. - */ - myflags = repflags; - if (FLD_ISSET(ctlflags, REPCTL_PERM)) - myflags |= DB_REP_PERMANENT; - else if (rtype != REP_LOG || FLD_ISSET(ctlflags, REPCTL_RESEND)) - myflags |= DB_REP_NOBUFFER; - - /* - * Let everyone know if we've been in an established group. - */ - if (F_ISSET(rep, REP_F_GROUP_ESTD)) - F_SET(&cntrl, REPCTL_GROUP_ESTD); - - /* - * We're sending messages to some other version. We cannot - * assume DB_REP_ANYWHERE is available. Turn it off. - */ - if (rep->version != DB_REPVERSION) - FLD_CLR(myflags, DB_REP_ANYWHERE); - - /* - * If we are a master sending a perm record, then set the - * REPCTL_LEASE flag to have the client reply. Also set - * the start time that the client will echo back to us. - * - * !!! If we are a master, using leases, we had better not be - * sending to an older version. - */ - if (IS_REP_MASTER(env) && IS_USING_LEASES(env) && - FLD_ISSET(ctlflags, REPCTL_PERM)) { - F_SET(&cntrl, REPCTL_LEASE); - DB_ASSERT(env, rep->version == DB_REPVERSION); - __os_gettime(env, &msg_time, 1); - cntrl.msg_sec = (u_int32_t)msg_time.tv_sec; - cntrl.msg_nsec = (u_int32_t)msg_time.tv_nsec; - } - - REP_PRINT_MESSAGE(env, eid, &cntrl, "rep_send_message", myflags); -#ifdef REP_DIAGNOSTIC - if (FLD_ISSET( - env->dbenv->verbose, DB_VERB_REP_MSGS) && rtype == REP_LOG) - __rep_print_logmsg(env, dbt, lsnp); -#endif - - /* - * If DB_REP_PERMANENT is set, the LSN better be non-zero. - */ - DB_ASSERT(env, !FLD_ISSET(myflags, DB_REP_PERMANENT) || - !IS_ZERO_LSN(cntrl.lsn)); - - /* - * If we're talking to an old version, send an old control structure. - */ - memset(&cdbt, 0, sizeof(cdbt)); - if (rep->version <= DB_REPVERSION_45) { - if (rep->version == DB_REPVERSION_45 && - F_ISSET(&cntrl, REPCTL_INIT)) { - F_CLR(&cntrl, REPCTL_INIT); - F_SET(&cntrl, REPCTL_INIT_45); - } - ocntrl.rep_version = cntrl.rep_version; - ocntrl.log_version = cntrl.log_version; - ocntrl.lsn = cntrl.lsn; - ocntrl.rectype = cntrl.rectype; - ocntrl.gen = cntrl.gen; - ocntrl.flags = cntrl.flags; - cdbt.data = &ocntrl; - cdbt.size = sizeof(ocntrl); - } else if (rep->version == DB_REPVERSION_46) { - cntrl46.rep_version = cntrl.rep_version; - cntrl46.log_version = cntrl.log_version; - cntrl46.lsn = cntrl.lsn; - cntrl46.rectype = cntrl.rectype; - cntrl46.gen = cntrl.gen; - cntrl46.msg_time.tv_sec = (time_t)cntrl.msg_sec; - cntrl46.msg_time.tv_nsec = (long)cntrl.msg_nsec; - cntrl46.flags = cntrl.flags; - cdbt.data = &cntrl46; - cdbt.size = sizeof(cntrl46); - } else { - (void)__rep_control_marshal(env, &cntrl, buf, - __REP_CONTROL_SIZE, &len); - DB_INIT_DBT(cdbt, buf, len); - } - - /* - * We set the LSN above to something valid. Give the master the - * actual LSN so that they can coordinate with permanent records from - * the client if they want to. - * - * !!! Even though we marshalled the control message for transmission, - * give the transport function the real LSN. - */ - ret = db_rep->send(dbenv, &cdbt, dbt, &cntrl.lsn, eid, myflags); - - /* - * We don't hold the rep lock, so this could miscount if we race. - * I don't think it's worth grabbing the mutex for that bit of - * extra accuracy. - */ - if (ret != 0) { - RPRINT(env, DB_VERB_REP_MSGS, (env, - "rep_send_function returned: %d", ret)); -#ifdef HAVE_STATISTICS - rep->stat.st_msgs_send_failures++; - } else - rep->stat.st_msgs_sent++; -#else - } -#endif - return (ret); -} - -#ifdef REP_DIAGNOSTIC -/* - * __rep_print_logmsg -- - * This is a debugging routine for printing out log records that - * we are about to transmit to a client. - */ -static void -__rep_print_logmsg(env, logdbt, lsnp) - ENV *env; - const DBT *logdbt; - DB_LSN *lsnp; -{ - static int first = 1; - static DB_DISTAB dtab; - - if (first) { - first = 0; - - (void)__bam_init_print(env, &dtab); - (void)__crdel_init_print(env, &dtab); - (void)__db_init_print(env, &dtab); - (void)__dbreg_init_print(env, &dtab); - (void)__fop_init_print(env, &dtab); - (void)__ham_init_print(env, &dtab); - (void)__qam_init_print(env, &dtab); - (void)__txn_init_print(env, &dtab); - } - - (void)__db_dispatch( - env, &dtab, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL); -} -#endif - -/* - * __rep_new_master -- - * Called after a master election to sync back up with a new master. - * It's possible that we already know of this new master in which case - * we don't need to do anything. - * - * This is written assuming that this message came from the master; we - * need to enforce that in __rep_process_record, but right now, we have - * no way to identify the master. - * - * PUBLIC: int __rep_new_master __P((ENV *, __rep_control_args *, int)); - */ -int -__rep_new_master(env, cntrl, eid) - ENV *env; - __rep_control_args *cntrl; - int eid; -{ - DBT dbt; - DB_LOG *dblp; - DB_LOGC *logc; - DB_LSN first_lsn, lsn; - DB_REP *db_rep; - DB_THREAD_INFO *ip; - LOG *lp; - REGENV *renv; - REGINFO *infop; - REP *rep; - db_timeout_t lease_to; - u_int32_t unused; - int change, do_req, lockout, ret, t_ret; - - db_rep = env->rep_handle; - rep = db_rep->region; - dblp = env->lg_handle; - lp = dblp->reginfo.primary; - ret = 0; - logc = NULL; - lockout = 0; - REP_SYSTEM_LOCK(env); - change = rep->gen != cntrl->gen || rep->master_id != eid; - /* - * If we're hearing from a current or new master, then we - * want to clear EPHASE0 in case this site is waiting to - * hear from the master. - */ - F_CLR(rep, REP_F_EPHASE0); - if (change) { - /* - * If we are already locking out others, we're either - * in the middle of sync-up recovery or internal init - * when this newmaster comes in (we also lockout in - * rep_start, but we cannot be racing that because we - * don't allow rep_proc_msg when rep_start is going on). - * - * We're about to become the client of a new master. Since we - * want to be able to sync with the new master as quickly as - * possible, interrupt any STARTSYNC from the old master. The - * new master may need to rely on acks from us and the old - * STARTSYNC is now irrelevant. - * - * Note that, conveniently, the "lockout" flag defines the - * section of this code path during which both "message lockout" - * and "memp sync interrupt" are in effect. - */ - if (F_ISSET(rep, REP_F_READY_MSG)) - goto lckout; - - if ((ret = __rep_lockout_msg(env, rep, 1)) != 0) - goto errlck; - - (void)__memp_set_config(env->dbenv, DB_MEMP_SYNC_INTERRUPT, 1); - lockout = 1; - /* - * We must wait any remaining lease time before accepting - * this new master. This must be after the lockout above - * so that no new message can be processed and re-grant - * the lease out from under us. - */ - if (IS_USING_LEASES(env) && - ((lease_to = __rep_lease_waittime(env)) != 0)) { - REP_SYSTEM_UNLOCK(env); - __os_yield(env, 0, (u_long)lease_to); - REP_SYSTEM_LOCK(env); - F_SET(rep, REP_F_LEASE_EXPIRED); - } - - if ((ret = __env_init_rec(env, cntrl->log_version)) != 0) - goto errlck; - - REP_SYSTEM_UNLOCK(env); - - MUTEX_LOCK(env, rep->mtx_clientdb); - __os_gettime(env, &lp->rcvd_ts, 1); - lp->wait_ts = rep->request_gap; - ZERO_LSN(lp->verify_lsn); - ZERO_LSN(lp->prev_ckp); - ZERO_LSN(lp->waiting_lsn); - ZERO_LSN(lp->max_wait_lsn); - /* - * Open if we need to, in preparation for the truncate - * we'll do in a moment. - */ - if (db_rep->rep_db == NULL && - (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) { - MUTEX_UNLOCK(env, rep->mtx_clientdb); - goto err; - } - - /* - * If we were in the middle of an internal initialization - * and we've discovered a new master instead, clean up - * our old internal init information. We need to clean - * up any flags and unlock our lockout. - */ - REP_SYSTEM_LOCK(env); - if (F_ISSET(rep, REP_F_READY_API | REP_F_READY_OP)) { - ret = __rep_init_cleanup(env, rep, DB_FORCE); - /* - * Note that if an in-progress internal init was indeed - * "cleaned up", clearing these flags now will allow the - * application to see a completely empty database - * environment for a moment (until the master responds - * to our ALL_REQ). - */ - F_CLR(rep, REP_F_ABBREVIATED | REP_F_RECOVER_MASK); - } - MUTEX_UNLOCK(env, rep->mtx_clientdb); - if (ret != 0) { - /* TODO: consider add'l error recovery steps. */ - goto errlck; - } - ENV_GET_THREAD_INFO(env, ip); - if ((ret = __db_truncate(db_rep->rep_db, ip, NULL, &unused)) - != 0) - goto errlck; - rep->stat.st_log_queued = 0; - - /* - * This needs to be performed under message lockout - * if we're actually changing master. - */ - __rep_elect_done(env, rep, 1); - RPRINT(env, DB_VERB_REP_MISC, (env, - "Updating gen from %lu to %lu from master %d", - (u_long)rep->gen, (u_long)cntrl->gen, eid)); - rep->gen = cntrl->gen; - (void)__rep_write_gen(env, rep, rep->gen); - if (rep->egen <= rep->gen) - rep->egen = rep->gen + 1; - rep->master_id = eid; - STAT(rep->stat.st_master_changes++); - rep->stat.st_startup_complete = 0; - __log_set_version(env, cntrl->log_version); - rep->version = cntrl->rep_version; - RPRINT(env, DB_VERB_REP_MISC, (env, - "egen: %lu. rep version %lu", - (u_long)rep->egen, (u_long)rep->version)); - - /* - * If we're delaying client sync-up, we know we have a - * new/changed master now, set flag indicating we are - * actively delaying. - */ - if (FLD_ISSET(rep->config, REP_C_DELAYCLIENT)) - F_SET(rep, REP_F_DELAY); - F_SET(rep, REP_F_NOARCHIVE | REP_F_RECOVER_VERIFY); - F_CLR(rep, REP_F_READY_MSG); - (void)__memp_set_config(env->dbenv, DB_MEMP_SYNC_INTERRUPT, 0); - lockout = 0; - } else - __rep_elect_done(env, rep, 1); - REP_SYSTEM_UNLOCK(env); - - MUTEX_LOCK(env, rep->mtx_clientdb); - lsn = lp->ready_lsn; - - if (!change) { - ret = 0; - do_req = __rep_check_doreq(env, rep); - MUTEX_UNLOCK(env, rep->mtx_clientdb); - /* - * If there wasn't a change, we might still have some - * catching up or verification to do. - */ - if (do_req && - (F_ISSET(rep, REP_F_RECOVER_MASK) || - LOG_COMPARE(&lsn, &cntrl->lsn) < 0)) { - ret = __rep_resend_req(env, 0); - if (ret != 0) - RPRINT(env, DB_VERB_REP_MISC, (env, - "resend_req ret is %lu", (u_long)ret)); - } - /* - * If we're not in one of the recovery modes, we need to - * clear the NOARCHIVE flag. Elections set NOARCHIVE - * and if we called an election and found the same - * master, we need to clear NOARCHIVE here. - */ - if (!F_ISSET(rep, REP_F_RECOVER_MASK)) { - REP_SYSTEM_LOCK(env); - F_CLR(rep, REP_F_NOARCHIVE); - REP_SYSTEM_UNLOCK(env); - } - return (ret); - } - MUTEX_UNLOCK(env, rep->mtx_clientdb); - - /* - * If the master changed, we need to start the process of - * figuring out what our last valid log record is. However, - * if both the master and we agree that the max LSN is 0,0, - * then there is no recovery to be done. If we are at 0 and - * the master is not, then we just need to request all the log - * records from the master. - */ - if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) { - if ((ret = __rep_newmaster_empty(env, eid)) != 0) - goto err; - goto newmaster_complete; - } - - memset(&dbt, 0, sizeof(dbt)); - /* - * If this client is farther ahead on the log file than the master, see - * if there is any overlap in the logs. If not, the client is too - * far ahead of the master and the client will start over. - */ - if (cntrl->lsn.file < lsn.file) { - if ((ret = __log_cursor(env, &logc)) != 0) - goto err; - ret = __logc_get(logc, &first_lsn, &dbt, DB_FIRST); - if ((t_ret = __logc_close(logc)) != 0 && ret == 0) - ret = t_ret; - if (ret == DB_NOTFOUND) - goto notfound; - else if (ret != 0) - goto err; - if (cntrl->lsn.file < first_lsn.file) - goto notfound; - } - if ((ret = __log_cursor(env, &logc)) != 0) - goto err; - ret = __rep_log_backup(env, rep, logc, &lsn); - if ((t_ret = __logc_close(logc)) != 0 && ret == 0) - ret = t_ret; - if (ret == DB_NOTFOUND) - goto notfound; - else if (ret != 0) - goto err; - - /* - * Finally, we have a record to ask for. - */ - MUTEX_LOCK(env, rep->mtx_clientdb); - lp->verify_lsn = lsn; - __os_gettime(env, &lp->rcvd_ts, 1); - lp->wait_ts = rep->request_gap; - MUTEX_UNLOCK(env, rep->mtx_clientdb); - if (!F_ISSET(rep, REP_F_DELAY)) - (void)__rep_send_message(env, - eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE); - goto newmaster_complete; - -err: /* - * If we failed, we need to clear the flags we may have set above - * because we're not going to be setting the verify_lsn. - */ - REP_SYSTEM_LOCK(env); -errlck: if (lockout) { - F_CLR(rep, REP_F_READY_MSG); - (void)__memp_set_config(env->dbenv, DB_MEMP_SYNC_INTERRUPT, 0); - } - F_CLR(rep, REP_F_RECOVER_MASK | REP_F_DELAY); -lckout: REP_SYSTEM_UNLOCK(env); - return (ret); - -notfound: - /* - * If we don't have an identification record, we still - * might have some log records but we're discarding them - * to sync up with the master from the start. - * Therefore, truncate our log and treat it as if it - * were empty. In-memory logs can't be completely - * zeroed using __log_vtruncate, so just zero them out. - */ - RPRINT(env, DB_VERB_REP_MISC, - (env, "No commit or ckp found. Truncate log.")); - if (lp->db_log_inmemory) { - ZERO_LSN(lsn); - ret = __log_zero(env, &lsn); - } else { - INIT_LSN(lsn); - ret = __log_vtruncate(env, &lsn, &lsn, NULL); - } - if (ret != 0 && ret != DB_NOTFOUND) - return (ret); - infop = env->reginfo; - renv = infop->primary; - REP_SYSTEM_LOCK(env); - (void)time(&renv->rep_timestamp); - REP_SYSTEM_UNLOCK(env); - if ((ret = __rep_newmaster_empty(env, eid)) != 0) - goto err; -newmaster_complete: - return (DB_REP_NEWMASTER); -} - -/* - * __rep_newmaster_empty - * Handle the case of a NEWMASTER message received when we have an empty - * log. This requires internal init. If we can't do that because of - * NOAUTOINIT, return JOIN_FAILURE. If F_DELAY is in effect, don't even - * consider NOAUTOINIT yet, because they could change it before rep_sync call. - */ -static int -__rep_newmaster_empty(env, eid) - ENV *env; - int eid; -{ - DB_REP *db_rep; - LOG *lp; - REP *rep; - int msg, ret; - - db_rep = env->rep_handle; - rep = db_rep->region; - lp = env->lg_handle->reginfo.primary; - msg = ret = 0; - - MUTEX_LOCK(env, rep->mtx_clientdb); - REP_SYSTEM_LOCK(env); - lp->wait_ts = rep->request_gap; - - /* Usual case is to skip to UPDATE state; we may revise this below. */ - F_CLR(rep, REP_F_RECOVER_VERIFY); - F_SET(rep, REP_F_RECOVER_UPDATE); - - if (F_ISSET(rep, REP_F_DELAY)) { - /* - * Having properly set up wait_ts for later, nothing more to - * do now. - */ - } else if (FLD_ISSET(rep->config, REP_C_NOAUTOINIT)) { - F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK); - ret = DB_REP_JOIN_FAILURE; - } else { - /* Normal case: neither DELAY nor NOAUTOINIT. */ - msg = 1; - } - REP_SYSTEM_UNLOCK(env); - MUTEX_UNLOCK(env, rep->mtx_clientdb); - - if (msg) - (void)__rep_send_message(env, eid, REP_UPDATE_REQ, - NULL, NULL, 0, 0); - return (ret); -} - -/* - * __rep_noarchive - * Used by log_archive to determine if it is okay to remove - * log files. - * - * PUBLIC: int __rep_noarchive __P((ENV *)); - */ -int -__rep_noarchive(env) - ENV *env; -{ - DB_REP *db_rep; - REGENV *renv; - REGINFO *infop; - REP *rep; - time_t timestamp; - - infop = env->reginfo; - renv = infop->primary; - - /* - * This is tested before REP_ON below because we always need - * to obey if any replication process has disabled archiving. - * Everything is in the environment region that we need here. - */ - if (F_ISSET(renv, DB_REGENV_REPLOCKED)) { - (void)time(×tamp); - TIMESTAMP_CHECK(env, timestamp, renv); - /* - * Check if we're still locked out after checking - * the timestamp. - */ - if (F_ISSET(renv, DB_REGENV_REPLOCKED)) - return (EINVAL); - } - - if (!REP_ON(env)) - return (0); - - db_rep = env->rep_handle; - rep = db_rep->region; - return (F_ISSET(rep, REP_F_NOARCHIVE) ? 1 : 0); -} - -/* - * __rep_send_vote - * Send this site's vote for the election. - * - * PUBLIC: void __rep_send_vote __P((ENV *, DB_LSN *, u_int32_t, u_int32_t, - * PUBLIC: u_int32_t, u_int32_t, u_int32_t, int, u_int32_t, u_int32_t)); - */ -void -__rep_send_vote(env, lsnp, nsites, nvotes, pri, tie, egen, eid, vtype, flags) - ENV *env; - DB_LSN *lsnp; - int eid; - u_int32_t nsites, nvotes, pri; - u_int32_t flags, egen, tie, vtype; -{ - DB_REP *db_rep; - DBT vote_dbt; - REP *rep; - REP_OLD_VOTE_INFO ovi; - __rep_vote_info_args vi; - u_int8_t buf[__REP_VOTE_INFO_SIZE]; - size_t len; - - db_rep = env->rep_handle; - rep = db_rep->region; - - memset(&vi, 0, sizeof(vi)); - memset(&vote_dbt, 0, sizeof(vote_dbt)); - - /* - * In 4.7 we went to fixed sized fields. They may not be - * the same as the sizes in older versions. - */ - if (rep->version < DB_REPVERSION_47) { - memset(&ovi, 0, sizeof(ovi)); - ovi.egen = egen; - ovi.priority = (int) pri; - ovi.nsites = (int) nsites; - ovi.nvotes = (int) nvotes; - ovi.tiebreaker = tie; - vote_dbt.data = &ovi; - vote_dbt.size = sizeof(ovi); - } else { - vi.egen = egen; - vi.priority = pri; - vi.nsites = nsites; - vi.nvotes = nvotes; - vi.tiebreaker = tie; - (void)__rep_vote_info_marshal(env, &vi, buf, - __REP_VOTE_INFO_SIZE, &len); - DB_INIT_DBT(vote_dbt, buf, len); - } - - (void)__rep_send_message(env, eid, vtype, lsnp, &vote_dbt, flags, 0); -} - -/* - * __rep_elect_done - * Clear all election information for this site. Assumes the - * caller hold the region mutex. - * - * PUBLIC: void __rep_elect_done __P((ENV *, REP *, int)); - */ -void -__rep_elect_done(env, rep, found_master) - ENV *env; - REP *rep; - int found_master; -{ - int inelect; - db_timespec endtime; - - inelect = IN_ELECTION(rep); - F_CLR(rep, REP_F_EPHASE1 | REP_F_EPHASE2 | REP_F_TALLY); - /* - * Finding a master trumps finding a new egen. - */ - if (found_master) - F_CLR(rep, REP_F_EGENUPDATE); - rep->sites = 0; - rep->votes = 0; - if (inelect) { - if (timespecisset(&rep->etime)) { - __os_gettime(env, &endtime, 1); - timespecsub(&endtime, &rep->etime); -#ifdef HAVE_STATISTICS - rep->stat.st_election_sec = (u_int32_t)endtime.tv_sec; - rep->stat.st_election_usec = (u_int32_t) - (endtime.tv_nsec / NS_PER_US); -#endif - RPRINT(env, DB_VERB_REP_ELECT, (env, - "Election finished in %lu.%09lu sec", - (u_long)endtime.tv_sec, (u_long)endtime.tv_nsec)); - timespecclear(&rep->etime); - } - rep->egen++; - } - RPRINT(env, DB_VERB_REP_ELECT, - (env, "Election done; egen %lu", (u_long)rep->egen)); -} - -/* - * __env_rep_enter -- - * - * Check if we are in the middle of replication initialization and/or - * recovery, and if so, disallow operations. If operations are allowed, - * increment handle-counts, so that we do not start recovery while we - * are operating in the library. - * - * PUBLIC: int __env_rep_enter __P((ENV *, int)); - */ -int -__env_rep_enter(env, checklock) - ENV *env; - int checklock; -{ - DB_REP *db_rep; - REGENV *renv; - REGINFO *infop; - REP *rep; - int cnt; - time_t timestamp; - - /* Check if locks have been globally turned off. */ - if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) - return (0); - - db_rep = env->rep_handle; - rep = db_rep->region; - - infop = env->reginfo; - renv = infop->primary; - if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) { - (void)time(×tamp); - TIMESTAMP_CHECK(env, timestamp, renv); - /* - * Check if we're still locked out after checking - * the timestamp. - */ - if (F_ISSET(renv, DB_REGENV_REPLOCKED)) - return (EINVAL); - } - - REP_SYSTEM_LOCK(env); - for (cnt = 0; F_ISSET(rep, REP_F_READY_API);) { - REP_SYSTEM_UNLOCK(env); - /* - * We're spinning - environment may be hung. Check if - * recovery has been initiated. - */ - PANIC_CHECK(env); - if (FLD_ISSET(rep->config, REP_C_NOWAIT)) { - __db_errx(env, - "Operation locked out. Waiting for replication lockout to complete"); - return (DB_REP_LOCKOUT); - } - __os_yield(env, 1, 0); - REP_SYSTEM_LOCK(env); - if (++cnt % 60 == 0) - __db_errx(env, - "DB_ENV handle waiting %d minutes for replication lockout to complete", - cnt / 60); - } - rep->handle_cnt++; - REP_SYSTEM_UNLOCK(env); - - return (0); -} - -/* - * __env_db_rep_exit -- - * - * Decrement handle count upon routine exit. - * - * PUBLIC: int __env_db_rep_exit __P((ENV *)); - */ -int -__env_db_rep_exit(env) - ENV *env; -{ - DB_REP *db_rep; - REP *rep; - - /* Check if locks have been globally turned off. */ - if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) - return (0); - - db_rep = env->rep_handle; - rep = db_rep->region; - - REP_SYSTEM_LOCK(env); - rep->handle_cnt--; - REP_SYSTEM_UNLOCK(env); - - return (0); -} - -/* - * __db_rep_enter -- - * Called in replicated environments to keep track of in-use handles - * and prevent any concurrent operation during recovery. If checkgen is - * non-zero, then we verify that the dbp has the same handle as the env. - * - * If return_now is non-zero, we'll return DB_DEADLOCK immediately, else we'll - * sleep before returning DB_DEADLOCK. Without the sleep, it is likely - * the application will immediately try again and could reach a retry - * limit before replication has a chance to finish. The sleep increases - * the probability that an application retry will succeed. - * - * Typically calls with txns set return_now so that we return immediately. - * We want to return immediately because we want the txn to abort ASAP - * so that the lockout can proceed. - * - * PUBLIC: int __db_rep_enter __P((DB *, int, int, int)); - */ -int -__db_rep_enter(dbp, checkgen, checklock, return_now) - DB *dbp; - int checkgen, checklock, return_now; -{ - DB_REP *db_rep; - ENV *env; - REGENV *renv; - REGINFO *infop; - REP *rep; - time_t timestamp; - - env = dbp->env; - /* Check if locks have been globally turned off. */ - if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) - return (0); - - db_rep = env->rep_handle; - rep = db_rep->region; - infop = env->reginfo; - renv = infop->primary; - - if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) { - (void)time(×tamp); - TIMESTAMP_CHECK(env, timestamp, renv); - /* - * Check if we're still locked out after checking - * the timestamp. - */ - if (F_ISSET(renv, DB_REGENV_REPLOCKED)) - return (EINVAL); - } - REP_SYSTEM_LOCK(env); - /* - * !!! - * Note, we are checking REP_F_READY_OP, but we are - * incrementing rep->handle_cnt. That seems like a mismatch, - * but the intention is to return DEADLOCK to the application - * which will cause them to abort the txn quickly and allow - * the lockout to proceed. - * - * The correctness of doing this depends on the fact that - * lockout of the API always sets REP_F_READY_OP first. - */ - if (F_ISSET(rep, REP_F_READY_OP)) { - REP_SYSTEM_UNLOCK(env); - if (!return_now) - __os_yield(env, 5, 0); - return (DB_LOCK_DEADLOCK); - } - - if (checkgen && dbp->timestamp != renv->rep_timestamp) { - REP_SYSTEM_UNLOCK(env); - __db_errx(env, "%s %s", - "replication recovery unrolled committed transactions;", - "open DB and DBcursor handles must be closed"); - return (DB_REP_HANDLE_DEAD); - } - rep->handle_cnt++; - REP_SYSTEM_UNLOCK(env); - - return (0); -} - -/* - * __op_rep_enter -- - * - * Check if we are in the middle of replication initialization and/or - * recovery, and if so, disallow new multi-step operations, such as - * transaction and memp gets. If operations are allowed, - * increment the op_cnt, so that we do not start recovery while we have - * active operations. - * - * PUBLIC: int __op_rep_enter __P((ENV *)); - */ -int -__op_rep_enter(env) - ENV *env; -{ - DB_REP *db_rep; - REP *rep; - int cnt; - - /* Check if locks have been globally turned off. */ - if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) - return (0); - - db_rep = env->rep_handle; - rep = db_rep->region; - - REP_SYSTEM_LOCK(env); - for (cnt = 0; F_ISSET(rep, REP_F_READY_OP);) { - REP_SYSTEM_UNLOCK(env); - /* - * We're spnning - enironment may be hung. Check if - * recovery has been initiated. - */ - PANIC_CHECK(env); - if (FLD_ISSET(rep->config, REP_C_NOWAIT)) { - __db_errx(env, - "Operation locked out. Waiting for replication lockout to complete"); - return (DB_REP_LOCKOUT); - } - __os_yield(env, 5, 0); - cnt += 5; - REP_SYSTEM_LOCK(env); - if (cnt % 60 == 0) - __db_errx(env, - "__op_rep_enter waiting %d minutes for lockout to complete", - cnt / 60); - } - rep->op_cnt++; - REP_SYSTEM_UNLOCK(env); - - return (0); -} - -/* - * __op_rep_exit -- - * - * Decrement op count upon transaction commit/abort/discard or - * memp_fput. - * - * PUBLIC: int __op_rep_exit __P((ENV *)); - */ -int -__op_rep_exit(env) - ENV *env; -{ - DB_REP *db_rep; - REP *rep; - - /* Check if locks have been globally turned off. */ - if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) - return (0); - - db_rep = env->rep_handle; - rep = db_rep->region; - - REP_SYSTEM_LOCK(env); - DB_ASSERT(env, rep->op_cnt > 0); - rep->op_cnt--; - REP_SYSTEM_UNLOCK(env); - - return (0); -} - -/* - * __rep_lockout_api -- - * Coordinate with other threads in the library and active txns so - * that we can run single-threaded, for recovery or internal backup. - * Assumes the caller holds the region mutex. - * - * PUBLIC: int __rep_lockout_api __P((ENV *, REP *)); - */ -int -__rep_lockout_api(env, rep) - ENV *env; - REP *rep; -{ - int ret; - - /* - * We must drain long-running operations first. We check - * REP_F_READY_OP in __db_rep_enter in order to allow them - * to abort existing txns quickly. Therefore, we must - * always lockout REP_F_READY_OP first, then REP_F_READY_API. - */ - if ((ret = __rep_lockout_int(env, rep, &rep->op_cnt, 0, - "op_cnt", REP_F_READY_OP)) != 0) - return (ret); - return (__rep_lockout_int(env, rep, &rep->handle_cnt, 0, - "handle_cnt", REP_F_READY_API)); -} - -/* - * __rep_lockout_apply -- - * Coordinate with other threads processing messages so that - * we can run single-threaded and know that no incoming - * message can apply new log records. - * This call should be short-term covering a specific critical - * operation where we need to make sure no new records change - * the log. Currently used to coordinate with elections. - * Assumes the caller holds the region mutex. - * - * PUBLIC: int __rep_lockout_apply __P((ENV *, REP *, u_int32_t)); - */ -int -__rep_lockout_apply(env, rep, apply_th) - ENV *env; - REP *rep; - u_int32_t apply_th; -{ - return (__rep_lockout_int(env, rep, &rep->apply_th, apply_th, - "apply_th", REP_F_READY_APPLY)); -} - -/* - * __rep_lockout_msg -- - * Coordinate with other threads processing messages so that - * we can run single-threaded and know that no incoming - * message can change the world (i.e., like a NEWMASTER message). - * This call should be short-term covering a specific critical - * operation where we need to make sure no new messages arrive - * in the middle and all message threads are out before we start it. - * Assumes the caller holds the region mutex. - * - * PUBLIC: int __rep_lockout_msg __P((ENV *, REP *, u_int32_t)); - */ -int -__rep_lockout_msg(env, rep, msg_th) - ENV *env; - REP *rep; - u_int32_t msg_th; -{ - return (__rep_lockout_int(env, rep, &rep->msg_th, msg_th, - "msg_th", REP_F_READY_MSG)); -} - -/* - * __rep_lockout_int -- - * Internal common code for locking out and coordinating - * with other areas of the code. - * Assumes the caller holds the region mutex. - * - */ -static int -__rep_lockout_int(env, rep, fieldp, field_val, msg, lockout_flag) - ENV *env; - REP *rep; - u_int32_t *fieldp; - const char *msg; - u_int32_t field_val, lockout_flag; -{ - int wait_cnt; - - F_SET(rep, lockout_flag); - for (wait_cnt = 0; *fieldp > field_val;) { - REP_SYSTEM_UNLOCK(env); - /* We're spinning - environment may be hung. Check if - * recovery has been initiated. - */ - PANIC_CHECK(env); - __os_yield(env, 1, 0); -#ifdef DIAGNOSTIC - if (wait_cnt == 5) - __db_errx(env, -"Waiting for %s (%lu) to complete replication lockout", - msg, (u_long)*fieldp); - if (++wait_cnt % 60 == 0) - __db_errx(env, -"Waiting for %s (%lu) to complete replication lockout for %d minutes", - msg, (u_long)*fieldp, wait_cnt / 60); -#endif - REP_SYSTEM_LOCK(env); - } - - COMPQUIET(msg, NULL); - return (0); -} - -/* - * __rep_send_throttle - - * Send a record, throttling if necessary. Callers of this function - * will throttle - breaking out of their loop, if the repth->type field - * changes from the normal message type to the *_MORE message type. - * This function will send the normal type unless throttling gets invoked. - * Then it sets the type field and sends the _MORE message. - * - * Throttling is always only relevant in serving requests, so we always send - * with REPCTL_RESEND. Additional desired flags can be passed in the ctlflags - * argument. - * - * PUBLIC: int __rep_send_throttle __P((ENV *, int, REP_THROTTLE *, - * PUBLIC: u_int32_t, u_int32_t)); - */ -int -__rep_send_throttle(env, eid, repth, flags, ctlflags) - ENV *env; - int eid; - REP_THROTTLE *repth; - u_int32_t ctlflags, flags; -{ - DB_REP *db_rep; - REP *rep; - u_int32_t size, typemore; - int check_limit; - - check_limit = repth->gbytes != 0 || repth->bytes != 0; - /* - * If we only want to do throttle processing and we don't have it - * turned on, return immediately. - */ - if (!check_limit && LF_ISSET(REP_THROTTLE_ONLY)) - return (0); - - db_rep = env->rep_handle; - rep = db_rep->region; - typemore = 0; - if (repth->type == REP_LOG) - typemore = REP_LOG_MORE; - if (repth->type == REP_PAGE) - typemore = REP_PAGE_MORE; - DB_ASSERT(env, typemore != 0); - - /* - * data_dbt.size is only the size of the log - * record; it doesn't count the size of the - * control structure. Factor that in as well - * so we're not off by a lot if our log records - * are small. - */ - size = repth->data_dbt->size + sizeof(__rep_control_args); - if (check_limit) { - while (repth->bytes <= size) { - if (repth->gbytes > 0) { - repth->bytes += GIGABYTE; - --(repth->gbytes); - continue; - } - /* - * We don't hold the rep mutex, - * and may miscount. - */ - STAT(rep->stat.st_nthrottles++); - repth->type = typemore; - goto send; - } - repth->bytes -= size; - } - /* - * Always send if it is typemore, otherwise send only if - * REP_THROTTLE_ONLY is not set. - * - * NOTE: It is the responsibility of the caller to marshal, if - * needed, the data_dbt. This function just sends what it is given. - */ -send: if ((repth->type == typemore || !LF_ISSET(REP_THROTTLE_ONLY)) && - (__rep_send_message(env, eid, repth->type, - &repth->lsn, repth->data_dbt, (REPCTL_RESEND | ctlflags), 0) != 0)) - return (DB_REP_UNAVAIL); - return (0); -} - -/* - * __rep_msg_to_old -- - * Convert current message numbers to old message numbers. - * - * PUBLIC: u_int32_t __rep_msg_to_old __P((u_int32_t, u_int32_t)); - */ -u_int32_t -__rep_msg_to_old(version, rectype) - u_int32_t version, rectype; -{ - /* - * We need to convert from current message numbers to old numbers and - * we need to convert from old numbers to current numbers. Offset by - * one for more readable code. - */ - /* - * Everything for version 0 is invalid, there is no version 0. - */ - static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = { - /* There is no DB_REPVERSION 0. */ - { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, - /* - * 4.2/DB_REPVERSION 1 no longer supported. - */ - { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, - /* - * 4.3/DB_REPVERSION 2 no longer supported. - */ - { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, - /* - * From 4.7 message number To 4.4/4.5 message number - */ - { REP_INVALID, /* NO message 0 */ - 1, /* REP_ALIVE */ - 2, /* REP_ALIVE_REQ */ - 3, /* REP_ALL_REQ */ - 4, /* REP_BULK_LOG */ - 5, /* REP_BULK_PAGE */ - 6, /* REP_DUPMASTER */ - 7, /* REP_FILE */ - 8, /* REP_FILE_FAIL */ - 9, /* REP_FILE_REQ */ - REP_INVALID, /* REP_LEASE_GRANT */ - 10, /* REP_LOG */ - 11, /* REP_LOG_MORE */ - 12, /* REP_LOG_REQ */ - 13, /* REP_MASTER_REQ */ - 14, /* REP_NEWCLIENT */ - 15, /* REP_NEWFILE */ - 16, /* REP_NEWMASTER */ - 17, /* REP_NEWSITE */ - 18, /* REP_PAGE */ - 19, /* REP_PAGE_FAIL */ - 20, /* REP_PAGE_MORE */ - 21, /* REP_PAGE_REQ */ - 22, /* REP_REREQUEST */ - REP_INVALID, /* REP_START_SYNC */ - 23, /* REP_UPDATE */ - 24, /* REP_UPDATE_REQ */ - 25, /* REP_VERIFY */ - 26, /* REP_VERIFY_FAIL */ - 27, /* REP_VERIFY_REQ */ - 28, /* REP_VOTE1 */ - 29 /* REP_VOTE2 */ - }, - /* - * From 4.7 message number To 4.6 message number. There are - * NO message differences between 4.6 and 4.7. The - * control structure changed. - */ - { REP_INVALID, /* NO message 0 */ - 1, /* REP_ALIVE */ - 2, /* REP_ALIVE_REQ */ - 3, /* REP_ALL_REQ */ - 4, /* REP_BULK_LOG */ - 5, /* REP_BULK_PAGE */ - 6, /* REP_DUPMASTER */ - 7, /* REP_FILE */ - 8, /* REP_FILE_FAIL */ - 9, /* REP_FILE_REQ */ - 10, /* REP_LEASE_GRANT */ - 11, /* REP_LOG */ - 12, /* REP_LOG_MORE */ - 13, /* REP_LOG_REQ */ - 14, /* REP_MASTER_REQ */ - 15, /* REP_NEWCLIENT */ - 16, /* REP_NEWFILE */ - 17, /* REP_NEWMASTER */ - 18, /* REP_NEWSITE */ - 19, /* REP_PAGE */ - 20, /* REP_PAGE_FAIL */ - 21, /* REP_PAGE_MORE */ - 22, /* REP_PAGE_REQ */ - 23, /* REP_REREQUEST */ - 24, /* REP_START_SYNC */ - 25, /* REP_UPDATE */ - 26, /* REP_UPDATE_REQ */ - 27, /* REP_VERIFY */ - 28, /* REP_VERIFY_FAIL */ - 29, /* REP_VERIFY_REQ */ - 30, /* REP_VOTE1 */ - 31 /* REP_VOTE2 */ - } - }; - return (table[version][rectype]); -} - -/* - * __rep_msg_from_old -- - * Convert old message numbers to current message numbers. - * - * PUBLIC: u_int32_t __rep_msg_from_old __P((u_int32_t, u_int32_t)); - */ -u_int32_t -__rep_msg_from_old(version, rectype) - u_int32_t version, rectype; -{ - /* - * We need to convert from current message numbers to old numbers and - * we need to convert from old numbers to current numbers. Offset by - * one for more readable code. - */ - /* - * Everything for version 0 is invalid, there is no version 0. - */ - static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = { - /* There is no DB_REPVERSION 0. */ - { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, - /* - * 4.2/DB_REPVERSION 1 no longer supported. - */ - { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, - /* - * 4.3/DB_REPVERSION 2 no longer supported. - */ - { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, - REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, - /* - * From 4.4/4.5 message number To 4.7 message number - */ - { REP_INVALID, /* NO message 0 */ - 1, /* 1, REP_ALIVE */ - 2, /* 2, REP_ALIVE_REQ */ - 3, /* 3, REP_ALL_REQ */ - 4, /* 4, REP_BULK_LOG */ - 5, /* 5, REP_BULK_PAGE */ - 6, /* 6, REP_DUPMASTER */ - 7, /* 7, REP_FILE */ - 8, /* 8, REP_FILE_FAIL */ - 9, /* 9, REP_FILE_REQ */ - /* 10, REP_LEASE_GRANT doesn't exist */ - 11, /* 10, REP_LOG */ - 12, /* 11, REP_LOG_MORE */ - 13, /* 12, REP_LOG_REQ */ - 14, /* 13, REP_MASTER_REQ */ - 15, /* 14, REP_NEWCLIENT */ - 16, /* 15, REP_NEWFILE */ - 17, /* 16, REP_NEWMASTER */ - 18, /* 17, REP_NEWSITE */ - 19, /* 18, REP_PAGE */ - 20, /* 19, REP_PAGE_FAIL */ - 21, /* 20, REP_PAGE_MORE */ - 22, /* 21, REP_PAGE_REQ */ - 23, /* 22, REP_REREQUEST */ - /* 24, REP_START_SYNC doesn't exist */ - 25, /* 23, REP_UPDATE */ - 26, /* 24, REP_UPDATE_REQ */ - 27, /* 25, REP_VERIFY */ - 28, /* 26, REP_VERIFY_FAIL */ - 29, /* 27, REP_VERIFY_REQ */ - 30, /* 28, REP_VOTE1 */ - 31, /* 29, REP_VOTE2 */ - REP_INVALID, /* 30, 4.4/4.5 no message */ - REP_INVALID /* 31, 4.4/4.5 no message */ - }, - /* - * From 4.6 message number To 4.6 message number. There are - * NO message differences between 4.6 and 4.7. The - * control structure changed. - */ - { REP_INVALID, /* NO message 0 */ - 1, /* 1, REP_ALIVE */ - 2, /* 2, REP_ALIVE_REQ */ - 3, /* 3, REP_ALL_REQ */ - 4, /* 4, REP_BULK_LOG */ - 5, /* 5, REP_BULK_PAGE */ - 6, /* 6, REP_DUPMASTER */ - 7, /* 7, REP_FILE */ - 8, /* 8, REP_FILE_FAIL */ - 9, /* 9, REP_FILE_REQ */ - 10, /* 10, REP_LEASE_GRANT */ - 11, /* 11, REP_LOG */ - 12, /* 12, REP_LOG_MORE */ - 13, /* 13, REP_LOG_REQ */ - 14, /* 14, REP_MASTER_REQ */ - 15, /* 15, REP_NEWCLIENT */ - 16, /* 16, REP_NEWFILE */ - 17, /* 17, REP_NEWMASTER */ - 18, /* 18, REP_NEWSITE */ - 19, /* 19, REP_PAGE */ - 20, /* 20, REP_PAGE_FAIL */ - 21, /* 21, REP_PAGE_MORE */ - 22, /* 22, REP_PAGE_REQ */ - 23, /* 22, REP_REREQUEST */ - 24, /* 24, REP_START_SYNC */ - 25, /* 25, REP_UPDATE */ - 26, /* 26, REP_UPDATE_REQ */ - 27, /* 27, REP_VERIFY */ - 28, /* 28, REP_VERIFY_FAIL */ - 29, /* 29, REP_VERIFY_REQ */ - 30, /* 30, REP_VOTE1 */ - 31 /* 31, REP_VOTE2 */ - } - }; - return (table[version][rectype]); -} - -/* - * __rep_print -- - * Optionally print a verbose message. - * - * PUBLIC: void __rep_print __P((ENV *, const char *, ...)) - * PUBLIC: __attribute__ ((__format__ (__printf__, 2, 3))); - */ -void -#ifdef STDC_HEADERS -__rep_print(ENV *env, const char *fmt, ...) -#else -__rep_print(env, fmt, va_alist) - ENV *env; - const char *fmt; - va_dcl -#endif -{ - va_list ap; - DB_MSGBUF mb; - REP *rep; - db_timespec ts; - pid_t pid; - db_threadid_t tid; - const char *s; - char buf[DB_THREADID_STRLEN]; - - DB_MSGBUF_INIT(&mb); - - s = NULL; - if (env->dbenv->db_errpfx != NULL) - s = env->dbenv->db_errpfx; - else if (REP_ON(env)) { - rep = env->rep_handle->region; - if (F_ISSET(rep, REP_F_CLIENT)) - s = "CLIENT"; - else if (F_ISSET(rep, REP_F_MASTER)) - s = "MASTER"; - } - if (s == NULL) - s = "REP_UNDEF"; - __os_gettime(env, &ts, 1); - __os_id(env->dbenv, &pid, &tid); - __db_msgadd(env, &mb, "[%lu:%lu][%s] %s: ", - (u_long)ts.tv_sec, (u_long)ts.tv_nsec/NS_PER_US, - env->dbenv->thread_id_string(env->dbenv, pid, tid, buf), s); - -#ifdef STDC_HEADERS - va_start(ap, fmt); -#else - va_start(ap); -#endif - __db_msgadd_ap(env, &mb, fmt, ap); - va_end(ap); - - DB_MSGBUF_FLUSH(env, &mb); -} - -/* - * PUBLIC: void __rep_print_message - * PUBLIC: __P((ENV *, int, __rep_control_args *, char *, u_int32_t)); - */ -void -__rep_print_message(env, eid, rp, str, flags) - ENV *env; - int eid; - __rep_control_args *rp; - char *str; - u_int32_t flags; -{ - u_int32_t ctlflags, rectype; - char ftype[64], *type; - - rectype = rp->rectype; - ctlflags = rp->flags; - if (rp->rep_version != DB_REPVERSION) - rectype = __rep_msg_from_old(rp->rep_version, rectype); - switch (rectype) { - case REP_ALIVE: - type = "alive"; - break; - case REP_ALIVE_REQ: - type = "alive_req"; - break; - case REP_ALL_REQ: - type = "all_req"; - break; - case REP_BULK_LOG: - type = "bulk_log"; - break; - case REP_BULK_PAGE: - type = "bulk_page"; - break; - case REP_DUPMASTER: - type = "dupmaster"; - break; - case REP_FILE: - type = "file"; - break; - case REP_FILE_FAIL: - type = "file_fail"; - break; - case REP_FILE_REQ: - type = "file_req"; - break; - case REP_LEASE_GRANT: - type = "lease_grant"; - break; - case REP_LOG: - type = "log"; - break; - case REP_LOG_MORE: - type = "log_more"; - break; - case REP_LOG_REQ: - type = "log_req"; - break; - case REP_MASTER_REQ: - type = "master_req"; - break; - case REP_NEWCLIENT: - type = "newclient"; - break; - case REP_NEWFILE: - type = "newfile"; - break; - case REP_NEWMASTER: - type = "newmaster"; - break; - case REP_NEWSITE: - type = "newsite"; - break; - case REP_PAGE: - type = "page"; - break; - case REP_PAGE_FAIL: - type = "page_fail"; - break; - case REP_PAGE_MORE: - type = "page_more"; - break; - case REP_PAGE_REQ: - type = "page_req"; - break; - case REP_REREQUEST: - type = "rerequest"; - break; - case REP_START_SYNC: - type = "start_sync"; - break; - case REP_UPDATE: - type = "update"; - break; - case REP_UPDATE_REQ: - type = "update_req"; - break; - case REP_VERIFY: - type = "verify"; - break; - case REP_VERIFY_FAIL: - type = "verify_fail"; - break; - case REP_VERIFY_REQ: - type = "verify_req"; - break; - case REP_VOTE1: - type = "vote1"; - break; - case REP_VOTE2: - type = "vote2"; - break; - default: - type = "NOTYPE"; - break; - } - - /* - * !!! - * If adding new flags to print out make sure the aggregate - * length cannot overflow the buffer. - */ - ftype[0] = '\0'; - if (LF_ISSET(DB_REP_ANYWHERE)) - (void)strcat(ftype, " any"); /* 4 */ - if (FLD_ISSET(ctlflags, REPCTL_FLUSH)) - (void)strcat(ftype, " flush"); /* 10 */ - /* - * We expect most of the time the messages will indicate - * group membership. Only print if we're not already - * part of a group. - */ - if (!FLD_ISSET(ctlflags, REPCTL_GROUP_ESTD)) - (void)strcat(ftype, " nogroup"); /* 18 */ - if (FLD_ISSET(ctlflags, REPCTL_LEASE)) - (void)strcat(ftype, " lease"); /* 24 */ - if (LF_ISSET(DB_REP_NOBUFFER)) - (void)strcat(ftype, " nobuf"); /* 30 */ - if (FLD_ISSET(ctlflags, REPCTL_PERM)) - (void)strcat(ftype, " perm"); /* 35 */ - if (LF_ISSET(DB_REP_REREQUEST)) - (void)strcat(ftype, " rereq"); /* 41 */ - if (FLD_ISSET(ctlflags, REPCTL_RESEND)) - (void)strcat(ftype, " resend"); /* 48 */ - if (FLD_ISSET(ctlflags, REPCTL_LOG_END)) - (void)strcat(ftype, " logend"); /* 55 */ - RPRINT(env, DB_VERB_REP_MSGS, - (env, - "%s %s: msgv = %lu logv %lu gen = %lu eid %d, type %s, LSN [%lu][%lu] %s", - env->db_home, str, - (u_long)rp->rep_version, (u_long)rp->log_version, (u_long)rp->gen, - eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset, ftype)); - /* - * Make sure the version is close, and not swapped - * here. Check for current version, +/- a little bit. - */ - DB_ASSERT(env, rp->rep_version <= DB_REPVERSION+10); - DB_ASSERT(env, rp->log_version <= DB_LOGVERSION+10); -} - -/* - * PUBLIC: void __rep_fire_event __P((ENV *, u_int32_t, void *)); - */ -void -__rep_fire_event(env, event, info) - ENV *env; - u_int32_t event; - void *info; -{ - int ret; - - /* - * Give repmgr first crack at handling all replication-related events. - * If it can't (or chooses not to) handle the event fully, then pass it - * along to the application. - */ - ret = __repmgr_handle_event(env, event, info); - DB_ASSERT(env, ret == 0 || ret == DB_EVENT_NOT_HANDLED); - - if (ret == DB_EVENT_NOT_HANDLED) - DB_EVENT(env, event, info); -} |