diff options
author | Anas Nashif <anas.nashif@intel.com> | 2012-10-30 16:00:08 -0700 |
---|---|---|
committer | Anas Nashif <anas.nashif@intel.com> | 2012-10-30 16:00:08 -0700 |
commit | 7edf6e8ac0df452d4af7a15da08609821b0b3c0f (patch) | |
tree | 1cf0f01d9b6574972173e3cd40b62e4ebeaaaaae /qam | |
download | db4-7edf6e8ac0df452d4af7a15da08609821b0b3c0f.tar.gz db4-7edf6e8ac0df452d4af7a15da08609821b0b3c0f.tar.bz2 db4-7edf6e8ac0df452d4af7a15da08609821b0b3c0f.zip |
Imported Upstream version 4.8.30.NCupstream/4.8.30.NC
Diffstat (limited to 'qam')
-rw-r--r-- | qam/qam.c | 1778 | ||||
-rw-r--r-- | qam/qam.src | 90 | ||||
-rw-r--r-- | qam/qam_auto.c | 1310 | ||||
-rw-r--r-- | qam/qam_autop.c | 260 | ||||
-rw-r--r-- | qam/qam_conv.c | 79 | ||||
-rw-r--r-- | qam/qam_files.c | 894 | ||||
-rw-r--r-- | qam/qam_method.c | 398 | ||||
-rw-r--r-- | qam/qam_open.c | 352 | ||||
-rw-r--r-- | qam/qam_rec.c | 663 | ||||
-rw-r--r-- | qam/qam_stat.c | 253 | ||||
-rw-r--r-- | qam/qam_stub.c | 340 | ||||
-rw-r--r-- | qam/qam_upgrade.c | 101 | ||||
-rw-r--r-- | qam/qam_verify.c | 633 |
13 files changed, 7151 insertions, 0 deletions
diff --git a/qam/qam.c b/qam/qam.c new file mode 100644 index 00000000..5f73f738 --- /dev/null +++ b/qam/qam.c @@ -0,0 +1,1778 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1999-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_page.h" +#include "dbinc/btree.h" +#include "dbinc/lock.h" +#include "dbinc/log.h" +#include "dbinc/mp.h" +#include "dbinc/qam.h" + +static int __qam_bulk __P((DBC *, DBT *, u_int32_t)); +static int __qamc_close __P((DBC *, db_pgno_t, int *)); +static int __qamc_del __P((DBC *, u_int32_t)); +static int __qamc_destroy __P((DBC *)); +static int __qamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); +static int __qamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); +static int __qam_consume __P((DBC *, QMETA *, db_recno_t)); +static int __qam_getno __P((DB *, const DBT *, db_recno_t *)); + +#define DONT_NEED_LOCKS(dbc) ((dbc)->txn == NULL || \ + F_ISSET(dbc, DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED)) + +/* + * __qam_position -- + * Position a queued access method cursor at a record. This returns + * the page locked. *exactp will be set if the record is valid. + * PUBLIC: int __qam_position + * PUBLIC: __P((DBC *, db_recno_t *, u_int32_t, int *)); + */ +int +__qam_position(dbc, recnop, get_mode, exactp) + DBC *dbc; /* open cursor */ + db_recno_t *recnop; /* pointer to recno to find */ + u_int32_t get_mode; /* flags to __memp_fget */ + int *exactp; /* indicate if it was found */ +{ + DB *dbp; + QAMDATA *qp; + QUEUE_CURSOR *cp; + db_pgno_t pg; + int ret; + + dbp = dbc->dbp; + cp = (QUEUE_CURSOR *)dbc->internal; + + /* Fetch the page for this recno. */ + cp->pgno = pg = QAM_RECNO_PAGE(dbp, *recnop); + + cp->page = NULL; + *exactp = 0; + if ((ret = __qam_fget(dbc, &pg, get_mode, &cp->page)) != 0) { + if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE) && + (ret == DB_PAGE_NOTFOUND || ret == ENOENT)) + ret = 0; + return (ret); + } + cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop); + + if (PGNO(cp->page) == 0) { + /* + * We have read an uninitialized page: set the page number if + * we're creating the page. Otherwise, we know that the record + * doesn't exist yet. + */ + if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE)) { + *exactp = 0; + return (0); + } + DB_ASSERT(dbp->env, FLD_ISSET(get_mode, DB_MPOOL_CREATE)); + PGNO(cp->page) = pg; + TYPE(cp->page) = P_QAMDATA; + } + + qp = QAM_GET_RECORD(dbp, cp->page, cp->indx); + *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0; + + return (ret); +} + +/* + * __qam_pitem -- + * Put an item on a queue page. Copy the data to the page and set the + * VALID and SET bits. If logging and the record was previously set, + * log that data, otherwise just log the new data. + * + * pagep must be write locked + * + * PUBLIC: int __qam_pitem + * PUBLIC: __P((DBC *, QPAGE *, u_int32_t, db_recno_t, DBT *)); + */ +int +__qam_pitem(dbc, pagep, indx, recno, data) + DBC *dbc; + QPAGE *pagep; + u_int32_t indx; + db_recno_t recno; + DBT *data; +{ + DB *dbp; + DBT olddata, pdata, *datap; + ENV *env; + QAMDATA *qp; + QUEUE *t; + u_int8_t *dest, *p; + int allocated, ret; + + dbp = dbc->dbp; + env = dbp->env; + t = (QUEUE *)dbp->q_internal; + allocated = ret = 0; + + if (data->size > t->re_len) + return (__db_rec_toobig(env, data->size, t->re_len)); + qp = QAM_GET_RECORD(dbp, pagep, indx); + + p = qp->data; + datap = data; + if (F_ISSET(data, DB_DBT_PARTIAL)) { + if (data->doff + data->dlen > t->re_len) { + __db_errx(env, + "%s: data offset plus length larger than record size of %lu", + "Record length error", (u_long)t->re_len); + return (EINVAL); + } + + if (data->size != data->dlen) + return (__db_rec_repl(env, data->size, data->dlen)); + + if (data->size == t->re_len) + goto no_partial; + + /* + * If we are logging, then we have to build the record + * first, otherwise, we can simply drop the change + * directly on the page. After this clause, make + * sure that datap and p are set up correctly so that + * copying datap into p does the right thing. + * + * Note, I am changing this so that if the existing + * record is not valid, we create a complete record + * to log so that both this and the recovery code is simpler. + */ + + if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) { + datap = &pdata; + memset(datap, 0, sizeof(*datap)); + + if ((ret = __os_malloc(env, + t->re_len, &datap->data)) != 0) + return (ret); + allocated = 1; + datap->size = t->re_len; + + /* + * Construct the record if it's valid, otherwise set it + * all to the pad character. + */ + dest = datap->data; + if (F_ISSET(qp, QAM_VALID)) + memcpy(dest, p, t->re_len); + else + memset(dest, (int)t->re_pad, t->re_len); + + dest += data->doff; + memcpy(dest, data->data, data->size); + } else { + datap = data; + p += data->doff; + } + } + +no_partial: + if (DBC_LOGGING(dbc)) { + olddata.size = 0; + if (F_ISSET(qp, QAM_SET)) { + olddata.data = qp->data; + olddata.size = t->re_len; + } + if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep), + 0, &LSN(pagep), pagep->pgno, + indx, recno, datap, qp->flags, + olddata.size == 0 ? NULL : &olddata)) != 0) + goto err; + } else if (!F_ISSET((dbc), DBC_RECOVER)) + LSN_NOT_LOGGED(LSN(pagep)); + + F_SET(qp, QAM_VALID | QAM_SET); + memcpy(p, datap->data, datap->size); + if (!F_ISSET(data, DB_DBT_PARTIAL)) + memset(p + datap->size, + (int)t->re_pad, t->re_len - datap->size); + +err: if (allocated) + __os_free(env, datap->data); + + return (ret); +} +/* + * __qamc_put + * Cursor put for queued access method. + * BEFORE and AFTER cannot be specified. + */ +static int +__qamc_put(dbc, key, data, flags, pgnop) + DBC *dbc; + DBT *key, *data; + u_int32_t flags; + db_pgno_t *pgnop; +{ + DB *dbp; + DB_MPOOLFILE *mpf; + ENV *env; + QMETA *meta; + QUEUE_CURSOR *cp; + db_pgno_t pg; + db_recno_t new_cur, new_first; + u_int32_t opcode; + int exact, ret, t_ret, writelock; + + dbp = dbc->dbp; + env = dbp->env; + mpf = dbp->mpf; + if (pgnop != NULL) + *pgnop = PGNO_INVALID; + + cp = (QUEUE_CURSOR *)dbc->internal; + + switch (flags) { + case DB_KEYFIRST: + case DB_KEYLAST: + case DB_NOOVERWRITE: + case DB_OVERWRITE_DUP: + if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) + return (ret); + /* FALLTHROUGH */ + case DB_CURRENT: + break; + default: + /* The interface shouldn't let anything else through. */ + return (__db_ferr(env, "DBC->put", 0)); + } + + /* Write lock the record. */ + if ((ret = __db_lget(dbc, LCK_COUPLE, + cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0) + return (ret); + + if ((ret = __qam_position(dbc, &cp->recno, + DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &exact)) != 0) { + /* We could not get the page, we can release the record lock. */ + (void)__LPUT(dbc, cp->lock); + return (ret); + } + + if (exact != 0 && flags == DB_NOOVERWRITE) + ret = DB_KEYEXIST; + else + /* Put the item on the page. */ + ret = __qam_pitem(dbc, + (QPAGE *)cp->page, cp->indx, cp->recno, data); + + if ((t_ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + cp->page = NULL; + cp->lock_mode = DB_LOCK_WRITE; + if (ret != 0) + return (ret); + + /* We may need to reset the head or tail of the queue. */ + pg = ((QUEUE *)dbp->q_internal)->q_meta; + + writelock = 0; + if ((ret = __db_lget(dbc, LCK_COUPLE, + pg, DB_LOCK_READ, 0, &cp->lock)) != 0) + return (ret); + if ((ret = __memp_fget(mpf, &pg, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) + goto err; + + opcode = 0; + new_cur = new_first = 0; + + /* + * If the put address is outside the queue, adjust the head and + * tail of the queue. If the order is inverted we move + * the one which is closer. The first case is when the + * queue is empty, move first and current to where the new + * insert is. + */ + +recheck: + if (meta->first_recno == meta->cur_recno) { + new_first = cp->recno; + new_cur = cp->recno + 1; + if (new_cur == RECNO_OOB) + new_cur++; + opcode |= QAM_SETFIRST; + opcode |= QAM_SETCUR; + } else { + if (QAM_BEFORE_FIRST(meta, cp->recno)) { + new_first = cp->recno; + opcode |= QAM_SETFIRST; + } + + if (QAM_AFTER_CURRENT(meta, cp->recno)) { + new_cur = cp->recno + 1; + if (new_cur == RECNO_OOB) + new_cur++; + opcode |= QAM_SETCUR; + } + } + + if (opcode == 0) + goto done; + + /* Drop the read lock and get the a write lock on the meta page. */ + if (writelock == 0 && (ret = __db_lget(dbc, LCK_COUPLE_ALWAYS, + pg, DB_LOCK_WRITE, 0, &cp->lock)) != 0) + goto done; + if (writelock++ == 0) + goto recheck; + + if (((ret = __memp_dirty(mpf, &meta, + dbc->thread_info, dbc->txn, dbc->priority, DB_MPOOL_DIRTY)) != 0 || + (DBC_LOGGING(dbc) && + (ret = __qam_mvptr_log(dbp, dbc->txn, + &meta->dbmeta.lsn, 0, opcode, meta->first_recno, + new_first, meta->cur_recno, new_cur, + &meta->dbmeta.lsn, PGNO_BASE_MD)) != 0))) + opcode = 0; + + if (opcode & QAM_SETCUR) + meta->cur_recno = new_cur; + if (opcode & QAM_SETFIRST) + meta->first_recno = new_first; + +done: if (meta != NULL && (t_ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + +err: /* Don't hold the meta page long term. */ + if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} + +/* + * __qam_append -- + * Perform a put(DB_APPEND) in queue. + * + * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *)); + */ +int +__qam_append(dbc, key, data) + DBC *dbc; + DBT *key, *data; +{ + DB *dbp; + DB_LOCK lock; + DB_MPOOLFILE *mpf; + QMETA *meta; + QPAGE *page; + QUEUE *qp; + QUEUE_CURSOR *cp; + db_pgno_t pg, metapg; + db_recno_t recno; + int ret, t_ret; + + dbp = dbc->dbp; + mpf = dbp->mpf; + cp = (QUEUE_CURSOR *)dbc->internal; + + /* Write lock the meta page. */ + metapg = ((QUEUE *)dbp->q_internal)->q_meta; + if ((ret = __db_lget(dbc, 0, metapg, DB_LOCK_WRITE, 0, &lock)) != 0) + return (ret); + if ((ret = __memp_fget(mpf, &metapg, + dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &meta)) != 0) + return (ret); + + /* Get the next record number. */ + recno = meta->cur_recno; + meta->cur_recno++; + if (meta->cur_recno == RECNO_OOB) + meta->cur_recno++; + if (meta->cur_recno == meta->first_recno) { + meta->cur_recno--; + if (meta->cur_recno == RECNO_OOB) + meta->cur_recno--; + + if (ret == 0) + ret = EFBIG; + goto err; + } + + if (QAM_BEFORE_FIRST(meta, recno)) + meta->first_recno = recno; + + /* Lock the record and release meta page lock. */ + ret = __db_lget(dbc, LCK_COUPLE_ALWAYS, + recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock); + /* Release the meta page. */ + if ((t_ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + meta = NULL; + + /* + * The application may modify the data based on the selected record + * number. We always want to call this even if we ultimately end + * up aborting, because we are allocating a record number, regardless. + */ + if (dbc->dbp->db_append_recno != NULL && + (t_ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0 && + ret == 0) + ret = t_ret; + + /* + * Capture errors from either the lock couple or the call to + * dbp->db_append_recno. + */ + if (ret != 0) + goto err; + + cp->lock = lock; + cp->lock_mode = DB_LOCK_WRITE; + LOCK_INIT(lock); + + pg = QAM_RECNO_PAGE(dbp, recno); + + /* Fetch for write the data page. */ + if ((ret = __qam_fget(dbc, &pg, + DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &page)) != 0) + goto err; + + /* See if this is a new page. */ + if (page->pgno == 0) { + page->pgno = pg; + page->type = P_QAMDATA; + } + + /* Put the item on the page and log it. */ + ret = __qam_pitem(dbc, page, + QAM_RECNO_INDEX(dbp, pg, recno), recno, data); + + if ((t_ret = __qam_fput(dbc, + pg, page, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + + /* Return the record number to the user. */ + if (ret == 0 && key != NULL) + ret = __db_retcopy(dbp->env, key, + &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen); + + /* Position the cursor on this record. */ + cp->recno = recno; + + /* See if we are leaving the extent. */ + qp = (QUEUE *) dbp->q_internal; + if (qp->page_ext != 0 && + (recno % (qp->page_ext * qp->rec_page) == 0 || + recno == UINT32_MAX)) { + if ((ret = __db_lget(dbc, + 0, metapg, DB_LOCK_READ, 0, &lock)) != 0) + goto err; + if ((ret = __memp_fget(mpf, &metapg, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) + goto err; + if (!QAM_AFTER_CURRENT(meta, recno)) + ret = __qam_fclose(dbp, pg); + } + +err: /* Release the meta page. */ + if (meta != NULL && (t_ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +/* + * __qamc_del -- + * Qam cursor->am_del function + */ +static int +__qamc_del(dbc, flags) + DBC *dbc; + u_int32_t flags; +{ + DB *dbp; + DBT data; + DB_LOCK metalock; + DB_MPOOLFILE *mpf; + PAGE *pagep; + QAMDATA *qp; + QMETA *meta; + QUEUE_CURSOR *cp; + db_pgno_t pg; + db_recno_t first; + int exact, ret, t_ret; + + dbp = dbc->dbp; + mpf = dbp->mpf; + cp = (QUEUE_CURSOR *)dbc->internal; + + pg = ((QUEUE *)dbp->q_internal)->q_meta; + /* Read lock the meta page. */ + if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &metalock)) != 0) + return (ret); + + if ((ret = __memp_fget(mpf, &pg, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) + return (ret); + + if (QAM_NOT_VALID(meta, cp->recno)) { + ret = DB_NOTFOUND; + goto err; + } + first = meta->first_recno; + + /* Don't hold the meta page long term. */ + if ((ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0) + goto err; + meta = NULL; + if ((ret = __LPUT(dbc, metalock)) != 0) + goto err; + + /* Get the record. */ + if ((ret = __db_lget(dbc, LCK_COUPLE, + cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0) + goto err; + cp->lock_mode = DB_LOCK_WRITE; + + /* Find the record; delete only deletes exact matches. */ + if ((ret = __qam_position(dbc, &cp->recno, + DB_MPOOL_DIRTY, &exact)) != 0) + goto err; + + if (!exact) { + ret = DB_NOTFOUND; + goto err; + } + + pagep = cp->page; + qp = QAM_GET_RECORD(dbp, pagep, cp->indx); + + if (DBC_LOGGING(dbc)) { + if (((QUEUE *)dbp->q_internal)->page_ext == 0 || + ((QUEUE *)dbp->q_internal)->re_len == 0) { + if ((ret = __qam_del_log(dbp, + dbc->txn, &LSN(pagep), 0, &LSN(pagep), + pagep->pgno, cp->indx, cp->recno)) != 0) + goto err; + } else { + data.size = ((QUEUE *)dbp->q_internal)->re_len; + data.data = qp->data; + if ((ret = __qam_delext_log(dbp, + dbc->txn, &LSN(pagep), 0, &LSN(pagep), + pagep->pgno, cp->indx, cp->recno, &data)) != 0) + goto err; + } + } else + LSN_NOT_LOGGED(LSN(pagep)); + + F_CLR(qp, QAM_VALID); + if ((ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0) + goto err; + cp->page = NULL; + + /* + * Other threads cannot move first_recno past + * our position while we have the record locked. + * If it's pointing at the deleted record then lock + * the metapage and check again as lower numbered + * record may have been inserted. + */ + if (LF_ISSET(DB_CONSUME) || cp->recno == first) { + pg = ((QUEUE *)dbp->q_internal)->q_meta; + if ((ret = + __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &metalock)) != 0) + goto err; + if ((ret = __memp_fget(mpf, &pg, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) + goto err; + if (LF_ISSET(DB_CONSUME) || cp->recno == meta->first_recno) + ret = __qam_consume(dbc, meta, meta->first_recno); + } + +err: if (meta != NULL && (t_ret = __memp_fput(mpf, dbc->thread_info, + meta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + /* Don't hold the meta page long term. */ + if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) + ret = t_ret; + + if (cp->page != NULL && + (t_ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + cp->page = NULL; + + return (ret); +} + +#ifdef DEBUG_WOP +#define QDEBUG +#endif + +/* + * __qamc_get -- + * Queue DBC->get function. + */ +static int +__qamc_get(dbc, key, data, flags, pgnop) + DBC *dbc; + DBT *key, *data; + u_int32_t flags; + db_pgno_t *pgnop; +{ + DB *dbp; + DBC *dbcdup; + DBT tmp; + DB_LOCK lock, metalock; + DB_MPOOLFILE *mpf; + ENV *env; + PAGE *pg; + QAMDATA *qp; + QMETA *meta; + QUEUE *t; + QUEUE_CURSOR *cp; + db_lockmode_t lock_mode, meta_mode; + db_pgno_t metapno; + db_recno_t first; + int exact, inorder, is_first, locked, ret, t_ret, wait, with_delete; + int retrying; + + dbp = dbc->dbp; + env = dbp->env; + mpf = dbp->mpf; + cp = (QUEUE_CURSOR *)dbc->internal; + LOCK_INIT(lock); + + lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ; + meta_mode = DB_LOCK_READ; + meta = NULL; + *pgnop = 0; + pg = NULL; + retrying = t_ret = wait = with_delete = 0; + + if (flags == DB_CONSUME_WAIT) { + wait = 1; + flags = DB_CONSUME; + } + if (flags == DB_CONSUME) { + with_delete = 1; + flags = DB_FIRST; + meta_mode = lock_mode = DB_LOCK_WRITE; + } + inorder = F_ISSET(dbp, DB_AM_INORDER) && with_delete; + + DEBUG_LREAD(dbc, dbc->txn, "qamc_get", + flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags); + + /* Make lint and friends happy. */ + locked = 0; + + is_first = 0; + first = 0; + + t = (QUEUE *)dbp->q_internal; + metapno = t->q_meta; + LOCK_INIT(metalock); + + /* + * Get the meta page first + */ + if (locked == 0 && (ret = __db_lget(dbc, + 0, metapno, meta_mode, 0, &metalock)) != 0) + goto err; + locked = 1; + if ((ret = __memp_fget(mpf, &metapno, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) + return (ret); + + /* Release any previous lock if not in a transaction. */ + if ((ret = __TLPUT(dbc, cp->lock)) != 0) + goto err; + +retry: /* Update the record number. */ + switch (flags) { + case DB_CURRENT: + break; + case DB_NEXT_DUP: + case DB_PREV_DUP: + ret = DB_NOTFOUND; + goto err; + /* NOTREACHED */ + case DB_NEXT: + case DB_NEXT_NODUP: + if (cp->recno != RECNO_OOB) { + ++cp->recno; + /* Wrap around, skipping zero. */ + if (cp->recno == RECNO_OOB) + cp->recno++; + /* + * Check to see if we are out of data. + */ + if (QAM_AFTER_CURRENT(meta, cp->recno)) { + pg = NULL; + if (!wait) { + ret = DB_NOTFOUND; + goto err; + } + flags = DB_FIRST; + /* + * If first is not set, then we skipped + * a locked record, go back and find it. + * If we find a locked record again + * wait for it. + */ + if (first == 0) { + retrying = 1; + goto retry; + } + + if (CDB_LOCKING(env)) { + /* Drop the metapage before we wait. */ + ret = __memp_fput(mpf, dbc->thread_info, + meta, dbc->priority); + meta = NULL; + if (ret != 0) + goto err; + if ((ret = __lock_get( + env, dbc->locker, + DB_LOCK_SWITCH, &dbc->lock_dbt, + DB_LOCK_WAIT, &dbc->mylock)) != 0) + goto err; + + if ((ret = __lock_get( + env, dbc->locker, + DB_LOCK_UPGRADE, &dbc->lock_dbt, + DB_LOCK_WRITE, &dbc->mylock)) != 0) + goto err; + if ((ret = __memp_fget(mpf, &metapno, + dbc->thread_info, + dbc->txn, 0, &meta)) != 0) + goto err; + goto retry; + } + /* + * Wait for someone to update the meta page. + * This will probably mean there is something + * in the queue. We then go back up and + * try again. + */ + if (locked == 0) { + if ((ret = __db_lget(dbc, 0, metapno, + meta_mode, 0, &metalock)) != 0) + goto err; + locked = 1; + if (cp->recno != RECNO_OOB && + !QAM_AFTER_CURRENT(meta, cp->recno)) + goto retry; + } + /* Drop the metapage before we wait. */ + ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority); + meta = NULL; + if (ret != 0) + goto err; + if ((ret = __db_lget(dbc, + 0, metapno, DB_LOCK_WAIT, + DB_LOCK_SWITCH, &metalock)) != 0) { + if (ret == DB_LOCK_DEADLOCK) + ret = DB_LOCK_NOTGRANTED; + goto err; + } + if ((ret = __db_lget(dbc, 0, + PGNO_INVALID, DB_LOCK_WRITE, + DB_LOCK_UPGRADE, &metalock)) != 0) { + if (ret == DB_LOCK_DEADLOCK) + ret = DB_LOCK_NOTGRANTED; + goto err; + } + if ((ret = __memp_fget(mpf, + &metapno, dbc->thread_info, dbc->txn, + 0, &meta)) != 0) + goto err; + locked = 1; + goto retry; + } + break; + } + /* FALLTHROUGH */ + case DB_FIRST: + flags = DB_NEXT; + is_first = 1; + + /* get the first record number */ + cp->recno = first = meta->first_recno; + + break; + case DB_PREV: + case DB_PREV_NODUP: + if (cp->recno != RECNO_OOB) { + if (cp->recno == meta->first_recno || + QAM_BEFORE_FIRST(meta, cp->recno)) { + ret = DB_NOTFOUND; + goto err; + } + --cp->recno; + /* Wrap around, skipping zero. */ + if (cp->recno == RECNO_OOB) + --cp->recno; + break; + } + /* FALLTHROUGH */ + case DB_LAST: + if (meta->first_recno == meta->cur_recno) { + ret = DB_NOTFOUND; + goto err; + } + cp->recno = meta->cur_recno - 1; + if (cp->recno == RECNO_OOB) + cp->recno--; + break; + case DB_SET: + case DB_SET_RANGE: + case DB_GET_BOTH: + case DB_GET_BOTH_RANGE: + if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) + goto err; + break; + default: + ret = __db_unknown_flag(env, "__qamc_get", flags); + goto err; + } + + /* Don't hold the meta page long term. */ + if (locked) { + if ((ret = __LPUT(dbc, metalock)) != 0) + goto err; + locked = 0; + } + + if ((ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0) + goto err; + meta = NULL; + + /* Lock the record. */ + if (((ret = __db_lget(dbc, LCK_COUPLE, cp->recno, lock_mode, + (with_delete && !inorder && !retrying) ? + DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD, + &lock)) == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED) && + with_delete) { +#ifdef QDEBUG + if (DBC_LOGGING(dbc)) + (void)__log_printf(env, + dbc->txn, "Queue S: %x %d %d", + dbc->locker ? dbc->locker->id : 0, + cp->recno, first); +#endif + first = 0; + if ((ret = + __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) + goto err; + locked = 1; + if ((ret = __memp_fget(mpf, &metapno, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) + goto err; + goto retry; + } + + if (ret != 0) + goto err; + + /* + * In the DB_FIRST or DB_LAST cases we must wait and then start over + * since the first/last may have moved while we slept. If we are + * reading in order and the first record was not there, we can skip it + * as it must have been aborted was was skipped by a non-queue insert + * or we could not have gotten its lock. If we have the wrong + * record we release our locks and try again. + */ + switch (flags) { + default: + if (inorder) { + if (first != cp->recno) + break; + } else if (with_delete || !is_first) + break; + /* FALLTHROUGH */ + case DB_SET: + case DB_SET_RANGE: + case DB_GET_BOTH: + case DB_GET_BOTH_RANGE: + case DB_LAST: + if ((ret = + __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) + goto lerr; + locked = 1; + if ((ret = __memp_fget(mpf, &metapno, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) + goto lerr; + if ((is_first && cp->recno != meta->first_recno) || + (flags == DB_LAST && cp->recno != meta->cur_recno - 1)) { + if ((ret = __LPUT(dbc, lock)) != 0) + goto err; + if (is_first) + flags = DB_FIRST; + goto retry; + } else if (!is_first && flags != DB_LAST) { + if (QAM_BEFORE_FIRST(meta, cp->recno)) { + if (flags == DB_SET_RANGE || + flags == DB_GET_BOTH_RANGE) { + if ((ret = __LPUT(dbc, metalock)) != 0) + goto err; + locked = 0; + cp->lock = lock; + LOCK_INIT(lock); + goto release_retry; + } + ret = DB_NOTFOUND; + goto lerr; + } + if (QAM_AFTER_CURRENT(meta, cp->recno)) { + ret = DB_NOTFOUND; + goto lerr; + } + } + /* Don't hold the meta page long term. */ + if ((ret = __LPUT(dbc, metalock)) != 0) + goto err; + locked = 0; + if ((ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0) + goto err; + meta = NULL; + } + + /* Position the cursor on the record. */ + if ((ret = __qam_position(dbc, &cp->recno, 0, &exact)) != 0) { + /* We cannot get the page, release the record lock. */ + (void)__LPUT(dbc, lock); + goto err; + } + + pg = cp->page; + cp->lock = lock; + cp->lock_mode = lock_mode; + LOCK_INIT(lock); + + if (!exact) { +release_retry: /* Release locks and retry, if possible. */ + if (pg != NULL) + (void)__qam_fput(dbc, cp->pgno, pg, dbc->priority); + cp->page = pg = NULL; + if (with_delete) { + if ((ret = __LPUT(dbc, cp->lock)) != 0) + goto err1; + } else if ((ret = __TLPUT(dbc, cp->lock)) != 0) + goto err1; + + if (locked == 0 && (ret = + __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) + goto err1; + locked = 1; + if (meta == NULL && (ret = __memp_fget(mpf, &metapno, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) + goto err1; + /* + * If we don't need locks and we are out of range + * then we can just skip to the FIRST/LAST record + * otherwise we must iterate to lock the records + * and get serializability. + */ + switch (flags) { + case DB_NEXT: + case DB_NEXT_NODUP: + if (!with_delete) + is_first = 0; + if (QAM_BEFORE_FIRST(meta, cp->recno) && + DONT_NEED_LOCKS(dbc)) + flags = DB_FIRST; + break; + case DB_LAST: + case DB_PREV: + case DB_PREV_NODUP: + if (QAM_AFTER_CURRENT(meta, cp->recno) && + DONT_NEED_LOCKS(dbc)) + flags = DB_LAST; + else + flags = DB_PREV; + break; + + case DB_GET_BOTH_RANGE: + case DB_SET_RANGE: + if (QAM_BEFORE_FIRST(meta, cp->recno) && + DONT_NEED_LOCKS(dbc)) + flags = DB_FIRST; + else + flags = DB_NEXT; + break; + + default: + /* this is for the SET and GET_BOTH cases */ + ret = DB_KEYEMPTY; + goto err1; + } + retrying = 0; + goto retry; + } + + qp = QAM_GET_RECORD(dbp, pg, cp->indx); + + /* Return the data item. */ + if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) { + /* + * Need to compare + */ + tmp.data = qp->data; + tmp.size = t->re_len; + if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) { + if (flags == DB_GET_BOTH_RANGE) + goto release_retry; + ret = DB_NOTFOUND; + goto err1; + } + } + + /* Return the key if the user didn't give us one. */ + if (key != NULL && !F_ISSET(key, DB_DBT_ISSET)) { + if ((ret = __db_retcopy(dbp->env, + key, &cp->recno, sizeof(cp->recno), + &dbc->rkey->data, &dbc->rkey->ulen)) != 0) + goto err1; + F_SET(key, DB_DBT_ISSET); + } + + if (data != NULL && + !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) && + !F_ISSET(data, DB_DBT_ISSET)) { + if ((ret = __db_retcopy(dbp->env, data, qp->data, t->re_len, + &dbc->rdata->data, &dbc->rdata->ulen)) != 0) + goto err1; + F_SET(data, DB_DBT_ISSET); + } + + /* Finally, if we are doing DB_CONSUME mark the record. */ + if (with_delete) { + /* + * Assert that we're not a secondary index. Doing a DB_CONSUME + * on a secondary makes very little sense, since one can't + * DB_APPEND there; attempting one should be forbidden by + * the interface. + */ + DB_ASSERT(env, !F_ISSET(dbp, DB_AM_SECONDARY)); + + /* + * If we have any secondary indices, call __dbc_del_primary to + * delete the references to the item we're about to delete. + * + * Note that we work on a duplicated cursor, since the + * __db_ret work has already been done, so it's not safe + * to perform any additional ops on this cursor. + */ + if (DB_IS_PRIMARY(dbp)) { + if ((ret = __dbc_idup(dbc, + &dbcdup, DB_POSITION)) != 0) + goto err1; + + if ((ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0) + goto err1; + cp->page = NULL; + if (meta != NULL && + (ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0) + goto err1; + meta = NULL; + if ((ret = __dbc_del_primary(dbcdup)) != 0) { + /* + * The __dbc_del_primary return is more + * interesting. + */ + (void)__dbc_close(dbcdup); + goto err1; + } + + if ((ret = __dbc_close(dbcdup)) != 0) + goto err1; + if ((ret = __qam_fget(dbc, + &cp->pgno, DB_MPOOL_DIRTY, &cp->page)) != 0) + goto err; + } else if ((ret = __qam_dirty(dbc, + cp->pgno, &cp->page, dbc->priority)) != 0) + goto err1; + + pg = cp->page; + + if (DBC_LOGGING(dbc)) { + if (t->page_ext == 0 || t->re_len == 0) { + if ((ret = __qam_del_log(dbp, dbc->txn, + &LSN(pg), 0, &LSN(pg), + pg->pgno, cp->indx, cp->recno)) != 0) + goto err1; + } else { + tmp.data = qp->data; + tmp.size = t->re_len; + if ((ret = __qam_delext_log(dbp, + dbc->txn, &LSN(pg), 0, &LSN(pg), + pg->pgno, cp->indx, cp->recno, &tmp)) != 0) + goto err1; + } + } else + LSN_NOT_LOGGED(LSN(pg)); + + F_CLR(qp, QAM_VALID); + if ((ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0) + goto err; + cp->page = NULL; + + /* + * Now we need to update the metapage + * first pointer. If we have deleted + * the record that is pointed to by + * first_recno then we move it as far + * forward as we can without blocking. + * The metapage lock must be held for + * the whole scan otherwise someone could + * do a random insert behind where we are + * looking. + */ + + if (locked == 0 && (ret = __db_lget( + dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) + goto err1; + locked = 1; + if (meta == NULL && + (ret = __memp_fget(mpf, + &metapno, dbc->thread_info, dbc->txn, 0, &meta)) != 0) + goto err1; + +#ifdef QDEBUG + if (DBC_LOGGING(dbc)) + (void)__log_printf(env, + dbc->txn, "Queue D: %x %d %d %d", + dbc->locker ? dbc->locker->id : 0, + cp->recno, first, meta->first_recno); +#endif + /* + * See if we deleted the "first" record. If + * first is zero then we skipped something, + * see if first_recno has been move passed + * that to the record that we deleted. + */ + if (first == 0) + first = cp->recno; + if (first != meta->first_recno) + goto done; + + if ((ret = __qam_consume(dbc, meta, first)) != 0) + goto err1; + } + +done: +err1: if (cp->page != NULL) { + if ((t_ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + + cp->page = NULL; + } + if (0) { +lerr: (void)__LPUT(dbc, lock); + } + +err: if (meta) { + /* Release the meta page. */ + if ((t_ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + + /* Don't hold the meta page long term. */ + if (locked) + if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) + ret = t_ret; + } + DB_ASSERT(env, !LOCK_ISSET(metalock)); + + return ((ret == DB_LOCK_NOTGRANTED && !F_ISSET(env->dbenv, + DB_ENV_TIME_NOTGRANTED)) ? DB_LOCK_DEADLOCK : ret); +} + +/* + * __qam_consume -- try to reset the head of the queue. + * + */ +static int +__qam_consume(dbc, meta, first) + DBC *dbc; + QMETA *meta; + db_recno_t first; +{ + DB *dbp; + DB_LOCK lock, save_lock; + DB_MPOOLFILE *mpf; + QUEUE_CURSOR *cp; + db_indx_t save_indx; + db_pgno_t save_page; + db_recno_t current, save_recno; + u_int32_t rec_extent; + int exact, ret, t_ret, wrapped; + + dbp = dbc->dbp; + mpf = dbp->mpf; + cp = (QUEUE_CURSOR *)dbc->internal; + ret = 0; + + save_page = cp->pgno; + save_indx = cp->indx; + save_recno = cp->recno; + save_lock = cp->lock; + + /* + * If we skipped some deleted records, we need to + * reposition on the first one. Get a lock + * in case someone is trying to put it back. + */ + if (first != cp->recno) { + ret = __db_lget(dbc, 0, first, DB_LOCK_READ, + DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); + if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) { + ret = 0; + goto done; + } + if (ret != 0) + goto done; + if (cp->page != NULL && (ret = + __qam_fput(dbc, cp->pgno, cp->page, dbc->priority)) != 0) + goto done; + cp->page = NULL; + if ((ret = __qam_position(dbc, + &first, 0, &exact)) != 0 || exact != 0) { + (void)__LPUT(dbc, lock); + goto done; + } + if ((ret =__LPUT(dbc, lock)) != 0) + goto done; + } + + current = meta->cur_recno; + wrapped = 0; + if (first > current) + wrapped = 1; + rec_extent = meta->page_ext * meta->rec_page; + + /* Loop until we find a record or hit current */ + for (;;) { + /* + * Check to see if we are moving off the extent + * and remove the extent. + * If we are moving off a page we need to + * get rid of the buffer. + * Wait for the lagging readers to move off the + * page. + */ + if (rec_extent != 0 && + ((exact = (first % rec_extent == 0)) || + (first % meta->rec_page == 0) || + first == UINT32_MAX)) { + if (exact == 1 && (ret = __db_lget(dbc, + 0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0) + break; +#ifdef QDEBUG + if (DBC_LOGGING(dbc)) + (void)__log_printf(dbp->env, dbc->txn, + "Queue R: %x %d %d %d", + dbc->locker ? dbc->locker->id : 0, + cp->pgno, first, meta->first_recno); +#endif + if (cp->page != NULL && (ret = __qam_fput(dbc, + cp->pgno, cp->page, DB_PRIORITY_VERY_LOW)) != 0) + break; + cp->page = NULL; + + if (exact == 1) { + ret = __qam_fremove(dbp, cp->pgno); + if ((t_ret = + __LPUT(dbc, cp->lock)) != 0 && ret == 0) + ret = t_ret; + } + if (ret != 0) + break; + } else if (cp->page != NULL && (ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0) + break; + cp->page = NULL; + first++; + if (first == RECNO_OOB) { + wrapped = 0; + first++; + } + + /* + * LOOP EXIT when we come move to the current + * pointer. + */ + if (!wrapped && first >= current) + break; + + ret = __db_lget(dbc, 0, first, DB_LOCK_READ, + DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); + if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) { + ret = 0; + break; + } + if (ret != 0) + break; + + if ((ret = __qam_position(dbc, &first, 0, &exact)) != 0) { + (void)__LPUT(dbc, lock); + break; + } + if ((ret =__LPUT(dbc, lock)) != 0 || exact) { + if ((t_ret = __qam_fput(dbc, cp->pgno, + cp->page, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + cp->page = NULL; + break; + } + } + + cp->pgno = save_page; + cp->indx = save_indx; + cp->recno = save_recno; + cp->lock = save_lock; + + /* + * We have advanced as far as we can. + * Advance first_recno to this point. + */ + if (ret == 0 && meta->first_recno != first) { + if ((ret = __memp_dirty(mpf, + &meta, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) + goto done; +#ifdef QDEBUG + if (DBC_LOGGING(dbc)) + (void)__log_printf(dbp->env, dbc->txn, + "Queue M: %x %d %d %d", + dbc->locker ? dbc->locker->id : 0, + cp->recno, first, meta->first_recno); +#endif + if (DBC_LOGGING(dbc)) { + if ((ret = __qam_incfirst_log(dbp, + dbc->txn, &meta->dbmeta.lsn, 0, + cp->recno, PGNO_BASE_MD)) != 0) + goto done; + } else + LSN_NOT_LOGGED(meta->dbmeta.lsn); + meta->first_recno = first; + } + +done: + return (ret); +} + +static int +__qam_bulk(dbc, data, flags) + DBC *dbc; + DBT *data; + u_int32_t flags; +{ + DB *dbp; + DB_LOCK metalock, rlock; + DB_MPOOLFILE *mpf; + PAGE *pg; + QAMDATA *qp; + QMETA *meta; + QUEUE_CURSOR *cp; + db_indx_t indx; + db_lockmode_t lkmode; + db_pgno_t metapno; + u_int32_t *endp, *offp; + u_int32_t pagesize, re_len, recs; + u_int8_t *dbuf, *dp, *np; + int exact, ret, t_ret, valid; + int is_key, need_pg, size, space; + + dbp = dbc->dbp; + mpf = dbp->mpf; + cp = (QUEUE_CURSOR *)dbc->internal; + + lkmode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ; + + pagesize = dbp->pgsize; + re_len = ((QUEUE *)dbp->q_internal)->re_len; + recs = ((QUEUE *)dbp->q_internal)->rec_page; + metapno = ((QUEUE *)dbp->q_internal)->q_meta; + + is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0; + size = 0; + + if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0) + return (ret); + if ((ret = __memp_fget(mpf, &metapno, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, metalock); + return (ret); + } + + dbuf = data->data; + np = dp = dbuf; + + /* Keep track of space that is left. There is an termination entry */ + space = (int)data->ulen; + space -= (int)sizeof(*offp); + + /* Build the offset/size table from the end up. */ + endp = (u_int32_t *)((u_int8_t *)dbuf + data->ulen); + endp--; + offp = endp; + /* Save the lock on the current position of the cursor. */ + rlock = cp->lock; + LOCK_INIT(cp->lock); + +next_pg: + /* Wrap around, skipping zero. */ + if (cp->recno == RECNO_OOB) + cp->recno++; + if ((ret = __qam_position(dbc, &cp->recno, 0, &exact)) != 0) + goto done; + + pg = cp->page; + indx = cp->indx; + need_pg = 1; + + do { + /* + * If this page is a nonexistent page at the end of an + * extent, pg may be NULL. A NULL page has no valid records, + * so just keep looping as though qp exists and isn't QAM_VALID; + * calling QAM_GET_RECORD is unsafe. + */ + valid = 0; + + if (pg != NULL) { + if ((ret = __db_lget(dbc, LCK_COUPLE, + cp->recno, lkmode, DB_LOCK_RECORD, &rlock)) != 0) + goto done; + qp = QAM_GET_RECORD(dbp, pg, indx); + if (F_ISSET(qp, QAM_VALID)) { + valid = 1; + space -= (int) + ((is_key ? 3 : 2) * sizeof(*offp)); + if (space < 0) + goto get_space; + if (need_pg) { + dp = np; + size = (int)pagesize - QPAGE_SZ(dbp); + if (space < size) { +get_space: + if (offp == endp) { + data->size = (u_int32_t) + DB_ALIGN((u_int32_t) + size + pagesize, + sizeof(u_int32_t)); + ret = DB_BUFFER_SMALL; + break; + } + if (indx != 0) + indx--; + cp->recno--; + space = 0; + break; + } + memcpy(dp, + (u_int8_t *)pg + QPAGE_SZ(dbp), + (u_int)size); + need_pg = 0; + space -= size; + np += size; + } + if (is_key) + *offp-- = cp->recno; + *offp-- = (u_int32_t)((((u_int8_t *)qp - + (u_int8_t *)pg) - QPAGE_SZ(dbp)) + + (dp - dbuf) + SSZA(QAMDATA, data)); + *offp-- = re_len; + } + } + if (!valid && is_key == 0) { + *offp-- = 0; + *offp-- = 0; + } + cp->recno++; + } while (++indx < recs && cp->recno != RECNO_OOB && + !QAM_AFTER_CURRENT(meta, cp->recno)); + + if (cp->page != NULL) { + if ((t_ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + cp->page = NULL; + } + + if (ret == 0 && space > 0 && + (indx >= recs || cp->recno == RECNO_OOB) && + !QAM_AFTER_CURRENT(meta, cp->recno)) + goto next_pg; + + /* + * Correct recno in two cases: + * 1) If we just wrapped fetch must start at record 1 not a FIRST. + * 2) We ran out of space exactly at the end of a page. + */ + if (cp->recno == RECNO_OOB || (space == 0 && indx == recs)) + cp->recno--; + + if (is_key == 1) + *offp = RECNO_OOB; + else + *offp = (u_int32_t)-1; + +done: /* Release the meta page. */ + if ((t_ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) + ret = t_ret; + + cp->lock = rlock; + + return (ret); +} + +/* + * __qamc_close -- + * Close down the cursor from a single use. + */ +static int +__qamc_close(dbc, root_pgno, rmroot) + DBC *dbc; + db_pgno_t root_pgno; + int *rmroot; +{ + QUEUE_CURSOR *cp; + int ret; + + COMPQUIET(root_pgno, 0); + COMPQUIET(rmroot, NULL); + + cp = (QUEUE_CURSOR *)dbc->internal; + + /* Discard any locks not acquired inside of a transaction. */ + ret = __TLPUT(dbc, cp->lock); + + LOCK_INIT(cp->lock); + cp->page = NULL; + cp->pgno = PGNO_INVALID; + cp->indx = 0; + cp->lock_mode = DB_LOCK_NG; + cp->recno = RECNO_OOB; + cp->flags = 0; + + return (ret); +} + +/* + * __qamc_dup -- + * Duplicate a queue cursor, such that the new one holds appropriate + * locks for the position of the original. + * + * PUBLIC: int __qamc_dup __P((DBC *, DBC *)); + */ +int +__qamc_dup(orig_dbc, new_dbc) + DBC *orig_dbc, *new_dbc; +{ + QUEUE_CURSOR *orig, *new; + + orig = (QUEUE_CURSOR *)orig_dbc->internal; + new = (QUEUE_CURSOR *)new_dbc->internal; + + new->recno = orig->recno; + + return (0); +} + +/* + * __qamc_init + * + * PUBLIC: int __qamc_init __P((DBC *)); + */ +int +__qamc_init(dbc) + DBC *dbc; +{ + DB *dbp; + QUEUE_CURSOR *cp; + int ret; + + dbp = dbc->dbp; + + /* Allocate the internal structure. */ + cp = (QUEUE_CURSOR *)dbc->internal; + if (cp == NULL) { + if ((ret = + __os_calloc(dbp->env, 1, sizeof(QUEUE_CURSOR), &cp)) != 0) + return (ret); + dbc->internal = (DBC_INTERNAL *)cp; + } + + /* Initialize methods. */ + dbc->close = dbc->c_close = __dbc_close_pp; + dbc->cmp = __dbc_cmp_pp; + dbc->count = dbc->c_count = __dbc_count_pp; + dbc->del = dbc->c_del = __dbc_del_pp; + dbc->dup = dbc->c_dup = __dbc_dup_pp; + dbc->get = dbc->c_get = __dbc_get_pp; + dbc->pget = dbc->c_pget = __dbc_pget_pp; + dbc->put = dbc->c_put = __dbc_put_pp; + dbc->am_bulk = __qam_bulk; + dbc->am_close = __qamc_close; + dbc->am_del = __qamc_del; + dbc->am_destroy = __qamc_destroy; + dbc->am_get = __qamc_get; + dbc->am_put = __qamc_put; + dbc->am_writelock = NULL; + + return (0); +} + +/* + * __qamc_destroy -- + * Close a single cursor -- internal version. + */ +static int +__qamc_destroy(dbc) + DBC *dbc; +{ + /* Discard the structures. */ + __os_free(dbc->env, dbc->internal); + + return (0); +} + +/* + * __qam_getno -- + * Check the user's record number. + */ +static int +__qam_getno(dbp, key, rep) + DB *dbp; + const DBT *key; + db_recno_t *rep; +{ + /* If passed an empty DBT from Java, key->data may be NULL */ + if (key->size != sizeof(db_recno_t)) { + __db_errx(dbp->env, "illegal record number size"); + return (EINVAL); + } + + if ((*rep = *(db_recno_t *)key->data) == 0) { + __db_errx(dbp->env, "illegal record number of 0"); + return (EINVAL); + } + return (0); +} + +/* + * __qam_truncate -- + * Truncate a queue database + * + * PUBLIC: int __qam_truncate __P((DBC *, u_int32_t *)); + */ +int +__qam_truncate(dbc, countp) + DBC *dbc; + u_int32_t *countp; +{ + DB *dbp; + DB_LOCK metalock; + DB_MPOOLFILE *mpf; + QMETA *meta; + db_pgno_t metapno; + u_int32_t count; + int ret, t_ret; + + dbp = dbc->dbp; + + /* Walk the queue, counting rows. */ + for (count = 0; + (ret = __qamc_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0;) + count++; + if (ret != DB_NOTFOUND) + return (ret); + + /* Update the meta page. */ + metapno = ((QUEUE *)dbp->q_internal)->q_meta; + if ((ret = + __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0) + return (ret); + + mpf = dbp->mpf; + if ((ret = __memp_fget(mpf, &metapno, dbc->thread_info, dbc->txn, + DB_MPOOL_DIRTY, &meta)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, metalock); + return (ret); + } + /* Remove the last extent file. */ + if (meta->cur_recno > 1 && ((QUEUE *)dbp->q_internal)->page_ext != 0) { + if ((ret = __qam_fremove(dbp, + QAM_RECNO_PAGE(dbp, meta->cur_recno - 1))) != 0) + goto err; + } + + if (DBC_LOGGING(dbc)) { + ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0, + QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno, + 1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD); + } else + LSN_NOT_LOGGED(meta->dbmeta.lsn); + if (ret == 0) + meta->first_recno = meta->cur_recno = 1; + +err: if ((t_ret = __memp_fput(mpf, + dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) + ret = t_ret; + + if (countp != NULL) + *countp = count; + + return (ret); +} + +/* + * __qam_delete -- + * Queue fast delete function. + * + * PUBLIC: int __qam_delete __P((DBC *, DBT *, u_int32_t)); + */ +int +__qam_delete(dbc, key, flags) + DBC *dbc; + DBT *key; + u_int32_t flags; +{ + QUEUE_CURSOR *cp; + int ret; + + cp = (QUEUE_CURSOR *)dbc->internal; + if ((ret = __qam_getno(dbc->dbp, key, &cp->recno)) != 0) + goto err; + + ret = __qamc_del(dbc, flags); + +err: return (ret); +} diff --git a/qam/qam.src b/qam/qam.src new file mode 100644 index 00000000..3896fbc3 --- /dev/null +++ b/qam/qam.src @@ -0,0 +1,90 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1999-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +DBPRIVATE +PREFIX __qam + +INCLUDE #include "db_int.h" +INCLUDE #include "dbinc/crypto.h" +INCLUDE #include "dbinc/db_page.h" +INCLUDE #include "dbinc/db_dispatch.h" +INCLUDE #include "dbinc/db_am.h" +INCLUDE #include "dbinc/log.h" +INCLUDE #include "dbinc/qam.h" +INCLUDE #include "dbinc/txn.h" +INCLUDE + +/* + * incfirst + * Used when we increment first_recno. + */ +BEGIN incfirst 42 84 +DB fileid int32_t ld +ARG recno db_recno_t lu +ARG meta_pgno db_pgno_t lu +END + +/* + * mvptr + * Used when we change one or both of cur_recno and first_recno. + */ +BEGIN mvptr 42 85 +ARG opcode u_int32_t lu +DB fileid int32_t ld +ARG old_first db_recno_t lu +ARG new_first db_recno_t lu +ARG old_cur db_recno_t lu +ARG new_cur db_recno_t lu +POINTER metalsn DB_LSN * lu +ARG meta_pgno db_pgno_t lu +END + + +/* + * del + * Used when we delete a record. + * recno is the record that is being deleted. + */ +BEGIN del 42 79 +DB fileid int32_t ld +POINTER lsn DB_LSN * lu +ARG pgno db_pgno_t lu +ARG indx u_int32_t lu +ARG recno db_recno_t lu +END + +/* + * add + * Used when we put a record on a page. + * recno is the record being added. + * data is the record itself. + */ +BEGIN add 42 80 +DB fileid int32_t ld +POINTER lsn DB_LSN * lu +ARG pgno db_pgno_t lu +ARG indx u_int32_t lu +ARG recno db_recno_t lu +DBT data DBT s +ARG vflag u_int32_t lu +DBT olddata DBT s +END + +/* + * delext + * Used when we delete a record in extent based queue. + * recno is the record that is being deleted. + */ +BEGIN delext 42 83 +DB fileid int32_t ld +POINTER lsn DB_LSN * lu +ARG pgno db_pgno_t lu +ARG indx u_int32_t lu +ARG recno db_recno_t lu +DBT data DBT s +END diff --git a/qam/qam_auto.c b/qam/qam_auto.c new file mode 100644 index 00000000..44136a23 --- /dev/null +++ b/qam/qam_auto.c @@ -0,0 +1,1310 @@ +/* Do not edit: automatically built by gen_rec.awk. */ + +#include "db_config.h" +#include "db_int.h" +#include "dbinc/crypto.h" +#include "dbinc/db_page.h" +#include "dbinc/db_dispatch.h" +#include "dbinc/db_am.h" +#include "dbinc/log.h" +#include "dbinc/qam.h" +#include "dbinc/txn.h" + +/* + * PUBLIC: int __qam_incfirst_read __P((ENV *, DB **, void *, + * PUBLIC: void *, __qam_incfirst_args **)); + */ +int +__qam_incfirst_read(env, dbpp, td, recbuf, argpp) + ENV *env; + DB **dbpp; + void *td; + void *recbuf; + __qam_incfirst_args **argpp; +{ + __qam_incfirst_args *argp; + u_int32_t uinttmp; + u_int8_t *bp; + int ret; + + if ((ret = __os_malloc(env, + sizeof(__qam_incfirst_args) + sizeof(DB_TXN), &argp)) != 0) + return (ret); + bp = recbuf; + argp->txnp = (DB_TXN *)&argp[1]; + memset(argp->txnp, 0, sizeof(DB_TXN)); + + argp->txnp->td = td; + LOGCOPY_32(env, &argp->type, bp); + bp += sizeof(argp->type); + + LOGCOPY_32(env, &argp->txnp->txnid, bp); + bp += sizeof(argp->txnp->txnid); + + LOGCOPY_TOLSN(env, &argp->prev_lsn, bp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, &uinttmp, bp); + argp->fileid = (int32_t)uinttmp; + bp += sizeof(uinttmp); + if (dbpp != NULL) { + *dbpp = NULL; + ret = __dbreg_id_to_db( + env, argp->txnp, dbpp, argp->fileid, 1); + } + + LOGCOPY_32(env, &uinttmp, bp); + argp->recno = (db_recno_t)uinttmp; + bp += sizeof(uinttmp); + + LOGCOPY_32(env, &uinttmp, bp); + argp->meta_pgno = (db_pgno_t)uinttmp; + bp += sizeof(uinttmp); + + *argpp = argp; + return (ret); +} + +/* + * PUBLIC: int __qam_incfirst_log __P((DB *, DB_TXN *, DB_LSN *, + * PUBLIC: u_int32_t, db_recno_t, db_pgno_t)); + */ +int +__qam_incfirst_log(dbp, txnp, ret_lsnp, flags, recno, meta_pgno) + DB *dbp; + DB_TXN *txnp; + DB_LSN *ret_lsnp; + u_int32_t flags; + db_recno_t recno; + db_pgno_t meta_pgno; +{ + DBT logrec; + DB_LSN *lsnp, null_lsn, *rlsnp; + DB_TXNLOGREC *lr; + ENV *env; + u_int32_t uinttmp, rectype, txn_num; + u_int npad; + u_int8_t *bp; + int is_durable, ret; + + COMPQUIET(lr, NULL); + + env = dbp->env; + rlsnp = ret_lsnp; + rectype = DB___qam_incfirst; + npad = 0; + ret = 0; + + if (LF_ISSET(DB_LOG_NOT_DURABLE) || + F_ISSET(dbp, DB_AM_NOT_DURABLE)) { + if (txnp == NULL) + return (0); + is_durable = 0; + } else + is_durable = 1; + + if (txnp == NULL) { + txn_num = 0; + lsnp = &null_lsn; + null_lsn.file = null_lsn.offset = 0; + } else { + if (TAILQ_FIRST(&txnp->kids) != NULL && + (ret = __txn_activekids(env, rectype, txnp)) != 0) + return (ret); + /* + * We need to assign begin_lsn while holding region mutex. + * That assignment is done inside the DbEnv->log_put call, + * so pass in the appropriate memory location to be filled + * in by the log_put code. + */ + DB_SET_TXN_LSNP(txnp, &rlsnp, &lsnp); + txn_num = txnp->txnid; + } + + DB_ASSERT(env, dbp->log_filename != NULL); + if (dbp->log_filename->id == DB_LOGFILEID_INVALID && + (ret = __dbreg_lazy_id(dbp)) != 0) + return (ret); + + logrec.size = sizeof(rectype) + sizeof(txn_num) + sizeof(DB_LSN) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t); + if (CRYPTO_ON(env)) { + npad = env->crypto_handle->adj_size(logrec.size); + logrec.size += npad; + } + + if (is_durable || txnp == NULL) { + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) + return (ret); + } else { + if ((ret = __os_malloc(env, + logrec.size + sizeof(DB_TXNLOGREC), &lr)) != 0) + return (ret); +#ifdef DIAGNOSTIC + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) { + __os_free(env, lr); + return (ret); + } +#else + logrec.data = lr->data; +#endif + } + if (npad > 0) + memset((u_int8_t *)logrec.data + logrec.size - npad, 0, npad); + + bp = logrec.data; + + LOGCOPY_32(env, bp, &rectype); + bp += sizeof(rectype); + + LOGCOPY_32(env, bp, &txn_num); + bp += sizeof(txn_num); + + LOGCOPY_FROMLSN(env, bp, lsnp); + bp += sizeof(DB_LSN); + + uinttmp = (u_int32_t)dbp->log_filename->id; + LOGCOPY_32(env, bp, &uinttmp); + bp += sizeof(uinttmp); + + uinttmp = (u_int32_t)recno; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + uinttmp = (u_int32_t)meta_pgno; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + DB_ASSERT(env, + (u_int32_t)(bp - (u_int8_t *)logrec.data) <= logrec.size); + + if (is_durable || txnp == NULL) { + if ((ret = __log_put(env, rlsnp,(DBT *)&logrec, + flags | DB_LOG_NOCOPY)) == 0 && txnp != NULL) { + *lsnp = *rlsnp; + if (rlsnp != ret_lsnp) + *ret_lsnp = *rlsnp; + } + } else { + ret = 0; +#ifdef DIAGNOSTIC + /* + * Set the debug bit if we are going to log non-durable + * transactions so they will be ignored by recovery. + */ + memcpy(lr->data, logrec.data, logrec.size); + rectype |= DB_debug_FLAG; + LOGCOPY_32(env, logrec.data, &rectype); + + if (!IS_REP_CLIENT(env)) + ret = __log_put(env, + rlsnp, (DBT *)&logrec, flags | DB_LOG_NOCOPY); +#endif + STAILQ_INSERT_HEAD(&txnp->logs, lr, links); + F_SET((TXN_DETAIL *)txnp->td, TXN_DTL_INMEMORY); + LSN_NOT_LOGGED(*ret_lsnp); + } + +#ifdef LOG_DIAGNOSTIC + if (ret != 0) + (void)__qam_incfirst_print(env, + (DBT *)&logrec, ret_lsnp, DB_TXN_PRINT, NULL); +#endif + +#ifdef DIAGNOSTIC + __os_free(env, logrec.data); +#else + if (is_durable || txnp == NULL) + __os_free(env, logrec.data); +#endif + return (ret); +} + +/* + * PUBLIC: int __qam_mvptr_read __P((ENV *, DB **, void *, void *, + * PUBLIC: __qam_mvptr_args **)); + */ +int +__qam_mvptr_read(env, dbpp, td, recbuf, argpp) + ENV *env; + DB **dbpp; + void *td; + void *recbuf; + __qam_mvptr_args **argpp; +{ + __qam_mvptr_args *argp; + u_int32_t uinttmp; + u_int8_t *bp; + int ret; + + if ((ret = __os_malloc(env, + sizeof(__qam_mvptr_args) + sizeof(DB_TXN), &argp)) != 0) + return (ret); + bp = recbuf; + argp->txnp = (DB_TXN *)&argp[1]; + memset(argp->txnp, 0, sizeof(DB_TXN)); + + argp->txnp->td = td; + LOGCOPY_32(env, &argp->type, bp); + bp += sizeof(argp->type); + + LOGCOPY_32(env, &argp->txnp->txnid, bp); + bp += sizeof(argp->txnp->txnid); + + LOGCOPY_TOLSN(env, &argp->prev_lsn, bp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, &argp->opcode, bp); + bp += sizeof(argp->opcode); + + LOGCOPY_32(env, &uinttmp, bp); + argp->fileid = (int32_t)uinttmp; + bp += sizeof(uinttmp); + if (dbpp != NULL) { + *dbpp = NULL; + ret = __dbreg_id_to_db( + env, argp->txnp, dbpp, argp->fileid, 1); + } + + LOGCOPY_32(env, &uinttmp, bp); + argp->old_first = (db_recno_t)uinttmp; + bp += sizeof(uinttmp); + + LOGCOPY_32(env, &uinttmp, bp); + argp->new_first = (db_recno_t)uinttmp; + bp += sizeof(uinttmp); + + LOGCOPY_32(env, &uinttmp, bp); + argp->old_cur = (db_recno_t)uinttmp; + bp += sizeof(uinttmp); + + LOGCOPY_32(env, &uinttmp, bp); + argp->new_cur = (db_recno_t)uinttmp; + bp += sizeof(uinttmp); + + LOGCOPY_TOLSN(env, &argp->metalsn, bp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, &uinttmp, bp); + argp->meta_pgno = (db_pgno_t)uinttmp; + bp += sizeof(uinttmp); + + *argpp = argp; + return (ret); +} + +/* + * PUBLIC: int __qam_mvptr_log __P((DB *, DB_TXN *, DB_LSN *, + * PUBLIC: u_int32_t, u_int32_t, db_recno_t, db_recno_t, db_recno_t, + * PUBLIC: db_recno_t, DB_LSN *, db_pgno_t)); + */ +int +__qam_mvptr_log(dbp, txnp, ret_lsnp, flags, + opcode, old_first, new_first, old_cur, new_cur, + metalsn, meta_pgno) + DB *dbp; + DB_TXN *txnp; + DB_LSN *ret_lsnp; + u_int32_t flags; + u_int32_t opcode; + db_recno_t old_first; + db_recno_t new_first; + db_recno_t old_cur; + db_recno_t new_cur; + DB_LSN * metalsn; + db_pgno_t meta_pgno; +{ + DBT logrec; + DB_LSN *lsnp, null_lsn, *rlsnp; + DB_TXNLOGREC *lr; + ENV *env; + u_int32_t uinttmp, rectype, txn_num; + u_int npad; + u_int8_t *bp; + int is_durable, ret; + + COMPQUIET(lr, NULL); + + env = dbp->env; + rlsnp = ret_lsnp; + rectype = DB___qam_mvptr; + npad = 0; + ret = 0; + + if (LF_ISSET(DB_LOG_NOT_DURABLE) || + F_ISSET(dbp, DB_AM_NOT_DURABLE)) { + if (txnp == NULL) + return (0); + is_durable = 0; + } else + is_durable = 1; + + if (txnp == NULL) { + txn_num = 0; + lsnp = &null_lsn; + null_lsn.file = null_lsn.offset = 0; + } else { + if (TAILQ_FIRST(&txnp->kids) != NULL && + (ret = __txn_activekids(env, rectype, txnp)) != 0) + return (ret); + /* + * We need to assign begin_lsn while holding region mutex. + * That assignment is done inside the DbEnv->log_put call, + * so pass in the appropriate memory location to be filled + * in by the log_put code. + */ + DB_SET_TXN_LSNP(txnp, &rlsnp, &lsnp); + txn_num = txnp->txnid; + } + + DB_ASSERT(env, dbp->log_filename != NULL); + if (dbp->log_filename->id == DB_LOGFILEID_INVALID && + (ret = __dbreg_lazy_id(dbp)) != 0) + return (ret); + + logrec.size = sizeof(rectype) + sizeof(txn_num) + sizeof(DB_LSN) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(*metalsn) + + sizeof(u_int32_t); + if (CRYPTO_ON(env)) { + npad = env->crypto_handle->adj_size(logrec.size); + logrec.size += npad; + } + + if (is_durable || txnp == NULL) { + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) + return (ret); + } else { + if ((ret = __os_malloc(env, + logrec.size + sizeof(DB_TXNLOGREC), &lr)) != 0) + return (ret); +#ifdef DIAGNOSTIC + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) { + __os_free(env, lr); + return (ret); + } +#else + logrec.data = lr->data; +#endif + } + if (npad > 0) + memset((u_int8_t *)logrec.data + logrec.size - npad, 0, npad); + + bp = logrec.data; + + LOGCOPY_32(env, bp, &rectype); + bp += sizeof(rectype); + + LOGCOPY_32(env, bp, &txn_num); + bp += sizeof(txn_num); + + LOGCOPY_FROMLSN(env, bp, lsnp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, bp, &opcode); + bp += sizeof(opcode); + + uinttmp = (u_int32_t)dbp->log_filename->id; + LOGCOPY_32(env, bp, &uinttmp); + bp += sizeof(uinttmp); + + uinttmp = (u_int32_t)old_first; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + uinttmp = (u_int32_t)new_first; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + uinttmp = (u_int32_t)old_cur; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + uinttmp = (u_int32_t)new_cur; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + if (metalsn != NULL) { + if (txnp != NULL) { + LOG *lp = env->lg_handle->reginfo.primary; + if (LOG_COMPARE(metalsn, &lp->lsn) >= 0 && (ret = + __log_check_page_lsn(env, dbp, metalsn)) != 0) + return (ret); + } + LOGCOPY_FROMLSN(env, bp, metalsn); + } else + memset(bp, 0, sizeof(*metalsn)); + bp += sizeof(*metalsn); + + uinttmp = (u_int32_t)meta_pgno; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + DB_ASSERT(env, + (u_int32_t)(bp - (u_int8_t *)logrec.data) <= logrec.size); + + if (is_durable || txnp == NULL) { + if ((ret = __log_put(env, rlsnp,(DBT *)&logrec, + flags | DB_LOG_NOCOPY)) == 0 && txnp != NULL) { + *lsnp = *rlsnp; + if (rlsnp != ret_lsnp) + *ret_lsnp = *rlsnp; + } + } else { + ret = 0; +#ifdef DIAGNOSTIC + /* + * Set the debug bit if we are going to log non-durable + * transactions so they will be ignored by recovery. + */ + memcpy(lr->data, logrec.data, logrec.size); + rectype |= DB_debug_FLAG; + LOGCOPY_32(env, logrec.data, &rectype); + + if (!IS_REP_CLIENT(env)) + ret = __log_put(env, + rlsnp, (DBT *)&logrec, flags | DB_LOG_NOCOPY); +#endif + STAILQ_INSERT_HEAD(&txnp->logs, lr, links); + F_SET((TXN_DETAIL *)txnp->td, TXN_DTL_INMEMORY); + LSN_NOT_LOGGED(*ret_lsnp); + } + +#ifdef LOG_DIAGNOSTIC + if (ret != 0) + (void)__qam_mvptr_print(env, + (DBT *)&logrec, ret_lsnp, DB_TXN_PRINT, NULL); +#endif + +#ifdef DIAGNOSTIC + __os_free(env, logrec.data); +#else + if (is_durable || txnp == NULL) + __os_free(env, logrec.data); +#endif + return (ret); +} + +/* + * PUBLIC: int __qam_del_read __P((ENV *, DB **, void *, void *, + * PUBLIC: __qam_del_args **)); + */ +int +__qam_del_read(env, dbpp, td, recbuf, argpp) + ENV *env; + DB **dbpp; + void *td; + void *recbuf; + __qam_del_args **argpp; +{ + __qam_del_args *argp; + u_int32_t uinttmp; + u_int8_t *bp; + int ret; + + if ((ret = __os_malloc(env, + sizeof(__qam_del_args) + sizeof(DB_TXN), &argp)) != 0) + return (ret); + bp = recbuf; + argp->txnp = (DB_TXN *)&argp[1]; + memset(argp->txnp, 0, sizeof(DB_TXN)); + + argp->txnp->td = td; + LOGCOPY_32(env, &argp->type, bp); + bp += sizeof(argp->type); + + LOGCOPY_32(env, &argp->txnp->txnid, bp); + bp += sizeof(argp->txnp->txnid); + + LOGCOPY_TOLSN(env, &argp->prev_lsn, bp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, &uinttmp, bp); + argp->fileid = (int32_t)uinttmp; + bp += sizeof(uinttmp); + if (dbpp != NULL) { + *dbpp = NULL; + ret = __dbreg_id_to_db( + env, argp->txnp, dbpp, argp->fileid, 1); + } + + LOGCOPY_TOLSN(env, &argp->lsn, bp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, &uinttmp, bp); + argp->pgno = (db_pgno_t)uinttmp; + bp += sizeof(uinttmp); + + LOGCOPY_32(env, &argp->indx, bp); + bp += sizeof(argp->indx); + + LOGCOPY_32(env, &uinttmp, bp); + argp->recno = (db_recno_t)uinttmp; + bp += sizeof(uinttmp); + + *argpp = argp; + return (ret); +} + +/* + * PUBLIC: int __qam_del_log __P((DB *, DB_TXN *, DB_LSN *, + * PUBLIC: u_int32_t, DB_LSN *, db_pgno_t, u_int32_t, db_recno_t)); + */ +int +__qam_del_log(dbp, txnp, ret_lsnp, flags, lsn, pgno, indx, recno) + DB *dbp; + DB_TXN *txnp; + DB_LSN *ret_lsnp; + u_int32_t flags; + DB_LSN * lsn; + db_pgno_t pgno; + u_int32_t indx; + db_recno_t recno; +{ + DBT logrec; + DB_LSN *lsnp, null_lsn, *rlsnp; + DB_TXNLOGREC *lr; + ENV *env; + u_int32_t uinttmp, rectype, txn_num; + u_int npad; + u_int8_t *bp; + int is_durable, ret; + + COMPQUIET(lr, NULL); + + env = dbp->env; + rlsnp = ret_lsnp; + rectype = DB___qam_del; + npad = 0; + ret = 0; + + if (LF_ISSET(DB_LOG_NOT_DURABLE) || + F_ISSET(dbp, DB_AM_NOT_DURABLE)) { + if (txnp == NULL) + return (0); + is_durable = 0; + } else + is_durable = 1; + + if (txnp == NULL) { + txn_num = 0; + lsnp = &null_lsn; + null_lsn.file = null_lsn.offset = 0; + } else { + if (TAILQ_FIRST(&txnp->kids) != NULL && + (ret = __txn_activekids(env, rectype, txnp)) != 0) + return (ret); + /* + * We need to assign begin_lsn while holding region mutex. + * That assignment is done inside the DbEnv->log_put call, + * so pass in the appropriate memory location to be filled + * in by the log_put code. + */ + DB_SET_TXN_LSNP(txnp, &rlsnp, &lsnp); + txn_num = txnp->txnid; + } + + DB_ASSERT(env, dbp->log_filename != NULL); + if (dbp->log_filename->id == DB_LOGFILEID_INVALID && + (ret = __dbreg_lazy_id(dbp)) != 0) + return (ret); + + logrec.size = sizeof(rectype) + sizeof(txn_num) + sizeof(DB_LSN) + + sizeof(u_int32_t) + + sizeof(*lsn) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t); + if (CRYPTO_ON(env)) { + npad = env->crypto_handle->adj_size(logrec.size); + logrec.size += npad; + } + + if (is_durable || txnp == NULL) { + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) + return (ret); + } else { + if ((ret = __os_malloc(env, + logrec.size + sizeof(DB_TXNLOGREC), &lr)) != 0) + return (ret); +#ifdef DIAGNOSTIC + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) { + __os_free(env, lr); + return (ret); + } +#else + logrec.data = lr->data; +#endif + } + if (npad > 0) + memset((u_int8_t *)logrec.data + logrec.size - npad, 0, npad); + + bp = logrec.data; + + LOGCOPY_32(env, bp, &rectype); + bp += sizeof(rectype); + + LOGCOPY_32(env, bp, &txn_num); + bp += sizeof(txn_num); + + LOGCOPY_FROMLSN(env, bp, lsnp); + bp += sizeof(DB_LSN); + + uinttmp = (u_int32_t)dbp->log_filename->id; + LOGCOPY_32(env, bp, &uinttmp); + bp += sizeof(uinttmp); + + if (lsn != NULL) { + if (txnp != NULL) { + LOG *lp = env->lg_handle->reginfo.primary; + if (LOG_COMPARE(lsn, &lp->lsn) >= 0 && (ret = + __log_check_page_lsn(env, dbp, lsn)) != 0) + return (ret); + } + LOGCOPY_FROMLSN(env, bp, lsn); + } else + memset(bp, 0, sizeof(*lsn)); + bp += sizeof(*lsn); + + uinttmp = (u_int32_t)pgno; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + LOGCOPY_32(env, bp, &indx); + bp += sizeof(indx); + + uinttmp = (u_int32_t)recno; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + DB_ASSERT(env, + (u_int32_t)(bp - (u_int8_t *)logrec.data) <= logrec.size); + + if (is_durable || txnp == NULL) { + if ((ret = __log_put(env, rlsnp,(DBT *)&logrec, + flags | DB_LOG_NOCOPY)) == 0 && txnp != NULL) { + *lsnp = *rlsnp; + if (rlsnp != ret_lsnp) + *ret_lsnp = *rlsnp; + } + } else { + ret = 0; +#ifdef DIAGNOSTIC + /* + * Set the debug bit if we are going to log non-durable + * transactions so they will be ignored by recovery. + */ + memcpy(lr->data, logrec.data, logrec.size); + rectype |= DB_debug_FLAG; + LOGCOPY_32(env, logrec.data, &rectype); + + if (!IS_REP_CLIENT(env)) + ret = __log_put(env, + rlsnp, (DBT *)&logrec, flags | DB_LOG_NOCOPY); +#endif + STAILQ_INSERT_HEAD(&txnp->logs, lr, links); + F_SET((TXN_DETAIL *)txnp->td, TXN_DTL_INMEMORY); + LSN_NOT_LOGGED(*ret_lsnp); + } + +#ifdef LOG_DIAGNOSTIC + if (ret != 0) + (void)__qam_del_print(env, + (DBT *)&logrec, ret_lsnp, DB_TXN_PRINT, NULL); +#endif + +#ifdef DIAGNOSTIC + __os_free(env, logrec.data); +#else + if (is_durable || txnp == NULL) + __os_free(env, logrec.data); +#endif + return (ret); +} + +/* + * PUBLIC: int __qam_add_read __P((ENV *, DB **, void *, void *, + * PUBLIC: __qam_add_args **)); + */ +int +__qam_add_read(env, dbpp, td, recbuf, argpp) + ENV *env; + DB **dbpp; + void *td; + void *recbuf; + __qam_add_args **argpp; +{ + __qam_add_args *argp; + u_int32_t uinttmp; + u_int8_t *bp; + int ret; + + if ((ret = __os_malloc(env, + sizeof(__qam_add_args) + sizeof(DB_TXN), &argp)) != 0) + return (ret); + bp = recbuf; + argp->txnp = (DB_TXN *)&argp[1]; + memset(argp->txnp, 0, sizeof(DB_TXN)); + + argp->txnp->td = td; + LOGCOPY_32(env, &argp->type, bp); + bp += sizeof(argp->type); + + LOGCOPY_32(env, &argp->txnp->txnid, bp); + bp += sizeof(argp->txnp->txnid); + + LOGCOPY_TOLSN(env, &argp->prev_lsn, bp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, &uinttmp, bp); + argp->fileid = (int32_t)uinttmp; + bp += sizeof(uinttmp); + if (dbpp != NULL) { + *dbpp = NULL; + ret = __dbreg_id_to_db( + env, argp->txnp, dbpp, argp->fileid, 1); + } + + LOGCOPY_TOLSN(env, &argp->lsn, bp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, &uinttmp, bp); + argp->pgno = (db_pgno_t)uinttmp; + bp += sizeof(uinttmp); + + LOGCOPY_32(env, &argp->indx, bp); + bp += sizeof(argp->indx); + + LOGCOPY_32(env, &uinttmp, bp); + argp->recno = (db_recno_t)uinttmp; + bp += sizeof(uinttmp); + + memset(&argp->data, 0, sizeof(argp->data)); + LOGCOPY_32(env,&argp->data.size, bp); + bp += sizeof(u_int32_t); + argp->data.data = bp; + bp += argp->data.size; + + LOGCOPY_32(env, &argp->vflag, bp); + bp += sizeof(argp->vflag); + + memset(&argp->olddata, 0, sizeof(argp->olddata)); + LOGCOPY_32(env,&argp->olddata.size, bp); + bp += sizeof(u_int32_t); + argp->olddata.data = bp; + bp += argp->olddata.size; + + *argpp = argp; + return (ret); +} + +/* + * PUBLIC: int __qam_add_log __P((DB *, DB_TXN *, DB_LSN *, + * PUBLIC: u_int32_t, DB_LSN *, db_pgno_t, u_int32_t, db_recno_t, + * PUBLIC: const DBT *, u_int32_t, const DBT *)); + */ +int +__qam_add_log(dbp, txnp, ret_lsnp, flags, lsn, pgno, indx, recno, data, + vflag, olddata) + DB *dbp; + DB_TXN *txnp; + DB_LSN *ret_lsnp; + u_int32_t flags; + DB_LSN * lsn; + db_pgno_t pgno; + u_int32_t indx; + db_recno_t recno; + const DBT *data; + u_int32_t vflag; + const DBT *olddata; +{ + DBT logrec; + DB_LSN *lsnp, null_lsn, *rlsnp; + DB_TXNLOGREC *lr; + ENV *env; + u_int32_t zero, uinttmp, rectype, txn_num; + u_int npad; + u_int8_t *bp; + int is_durable, ret; + + COMPQUIET(lr, NULL); + + env = dbp->env; + rlsnp = ret_lsnp; + rectype = DB___qam_add; + npad = 0; + ret = 0; + + if (LF_ISSET(DB_LOG_NOT_DURABLE) || + F_ISSET(dbp, DB_AM_NOT_DURABLE)) { + if (txnp == NULL) + return (0); + is_durable = 0; + } else + is_durable = 1; + + if (txnp == NULL) { + txn_num = 0; + lsnp = &null_lsn; + null_lsn.file = null_lsn.offset = 0; + } else { + if (TAILQ_FIRST(&txnp->kids) != NULL && + (ret = __txn_activekids(env, rectype, txnp)) != 0) + return (ret); + /* + * We need to assign begin_lsn while holding region mutex. + * That assignment is done inside the DbEnv->log_put call, + * so pass in the appropriate memory location to be filled + * in by the log_put code. + */ + DB_SET_TXN_LSNP(txnp, &rlsnp, &lsnp); + txn_num = txnp->txnid; + } + + DB_ASSERT(env, dbp->log_filename != NULL); + if (dbp->log_filename->id == DB_LOGFILEID_INVALID && + (ret = __dbreg_lazy_id(dbp)) != 0) + return (ret); + + logrec.size = sizeof(rectype) + sizeof(txn_num) + sizeof(DB_LSN) + + sizeof(u_int32_t) + + sizeof(*lsn) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + (data == NULL ? 0 : data->size) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + (olddata == NULL ? 0 : olddata->size); + if (CRYPTO_ON(env)) { + npad = env->crypto_handle->adj_size(logrec.size); + logrec.size += npad; + } + + if (is_durable || txnp == NULL) { + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) + return (ret); + } else { + if ((ret = __os_malloc(env, + logrec.size + sizeof(DB_TXNLOGREC), &lr)) != 0) + return (ret); +#ifdef DIAGNOSTIC + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) { + __os_free(env, lr); + return (ret); + } +#else + logrec.data = lr->data; +#endif + } + if (npad > 0) + memset((u_int8_t *)logrec.data + logrec.size - npad, 0, npad); + + bp = logrec.data; + + LOGCOPY_32(env, bp, &rectype); + bp += sizeof(rectype); + + LOGCOPY_32(env, bp, &txn_num); + bp += sizeof(txn_num); + + LOGCOPY_FROMLSN(env, bp, lsnp); + bp += sizeof(DB_LSN); + + uinttmp = (u_int32_t)dbp->log_filename->id; + LOGCOPY_32(env, bp, &uinttmp); + bp += sizeof(uinttmp); + + if (lsn != NULL) { + if (txnp != NULL) { + LOG *lp = env->lg_handle->reginfo.primary; + if (LOG_COMPARE(lsn, &lp->lsn) >= 0 && (ret = + __log_check_page_lsn(env, dbp, lsn)) != 0) + return (ret); + } + LOGCOPY_FROMLSN(env, bp, lsn); + } else + memset(bp, 0, sizeof(*lsn)); + bp += sizeof(*lsn); + + uinttmp = (u_int32_t)pgno; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + LOGCOPY_32(env, bp, &indx); + bp += sizeof(indx); + + uinttmp = (u_int32_t)recno; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + if (data == NULL) { + zero = 0; + LOGCOPY_32(env, bp, &zero); + bp += sizeof(u_int32_t); + } else { + LOGCOPY_32(env, bp, &data->size); + bp += sizeof(data->size); + memcpy(bp, data->data, data->size); + bp += data->size; + } + + LOGCOPY_32(env, bp, &vflag); + bp += sizeof(vflag); + + if (olddata == NULL) { + zero = 0; + LOGCOPY_32(env, bp, &zero); + bp += sizeof(u_int32_t); + } else { + LOGCOPY_32(env, bp, &olddata->size); + bp += sizeof(olddata->size); + memcpy(bp, olddata->data, olddata->size); + bp += olddata->size; + } + + DB_ASSERT(env, + (u_int32_t)(bp - (u_int8_t *)logrec.data) <= logrec.size); + + if (is_durable || txnp == NULL) { + if ((ret = __log_put(env, rlsnp,(DBT *)&logrec, + flags | DB_LOG_NOCOPY)) == 0 && txnp != NULL) { + *lsnp = *rlsnp; + if (rlsnp != ret_lsnp) + *ret_lsnp = *rlsnp; + } + } else { + ret = 0; +#ifdef DIAGNOSTIC + /* + * Set the debug bit if we are going to log non-durable + * transactions so they will be ignored by recovery. + */ + memcpy(lr->data, logrec.data, logrec.size); + rectype |= DB_debug_FLAG; + LOGCOPY_32(env, logrec.data, &rectype); + + if (!IS_REP_CLIENT(env)) + ret = __log_put(env, + rlsnp, (DBT *)&logrec, flags | DB_LOG_NOCOPY); +#endif + STAILQ_INSERT_HEAD(&txnp->logs, lr, links); + F_SET((TXN_DETAIL *)txnp->td, TXN_DTL_INMEMORY); + LSN_NOT_LOGGED(*ret_lsnp); + } + +#ifdef LOG_DIAGNOSTIC + if (ret != 0) + (void)__qam_add_print(env, + (DBT *)&logrec, ret_lsnp, DB_TXN_PRINT, NULL); +#endif + +#ifdef DIAGNOSTIC + __os_free(env, logrec.data); +#else + if (is_durable || txnp == NULL) + __os_free(env, logrec.data); +#endif + return (ret); +} + +/* + * PUBLIC: int __qam_delext_read __P((ENV *, DB **, void *, void *, + * PUBLIC: __qam_delext_args **)); + */ +int +__qam_delext_read(env, dbpp, td, recbuf, argpp) + ENV *env; + DB **dbpp; + void *td; + void *recbuf; + __qam_delext_args **argpp; +{ + __qam_delext_args *argp; + u_int32_t uinttmp; + u_int8_t *bp; + int ret; + + if ((ret = __os_malloc(env, + sizeof(__qam_delext_args) + sizeof(DB_TXN), &argp)) != 0) + return (ret); + bp = recbuf; + argp->txnp = (DB_TXN *)&argp[1]; + memset(argp->txnp, 0, sizeof(DB_TXN)); + + argp->txnp->td = td; + LOGCOPY_32(env, &argp->type, bp); + bp += sizeof(argp->type); + + LOGCOPY_32(env, &argp->txnp->txnid, bp); + bp += sizeof(argp->txnp->txnid); + + LOGCOPY_TOLSN(env, &argp->prev_lsn, bp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, &uinttmp, bp); + argp->fileid = (int32_t)uinttmp; + bp += sizeof(uinttmp); + if (dbpp != NULL) { + *dbpp = NULL; + ret = __dbreg_id_to_db( + env, argp->txnp, dbpp, argp->fileid, 1); + } + + LOGCOPY_TOLSN(env, &argp->lsn, bp); + bp += sizeof(DB_LSN); + + LOGCOPY_32(env, &uinttmp, bp); + argp->pgno = (db_pgno_t)uinttmp; + bp += sizeof(uinttmp); + + LOGCOPY_32(env, &argp->indx, bp); + bp += sizeof(argp->indx); + + LOGCOPY_32(env, &uinttmp, bp); + argp->recno = (db_recno_t)uinttmp; + bp += sizeof(uinttmp); + + memset(&argp->data, 0, sizeof(argp->data)); + LOGCOPY_32(env,&argp->data.size, bp); + bp += sizeof(u_int32_t); + argp->data.data = bp; + bp += argp->data.size; + + *argpp = argp; + return (ret); +} + +/* + * PUBLIC: int __qam_delext_log __P((DB *, DB_TXN *, DB_LSN *, + * PUBLIC: u_int32_t, DB_LSN *, db_pgno_t, u_int32_t, db_recno_t, + * PUBLIC: const DBT *)); + */ +int +__qam_delext_log(dbp, txnp, ret_lsnp, flags, lsn, pgno, indx, recno, data) + DB *dbp; + DB_TXN *txnp; + DB_LSN *ret_lsnp; + u_int32_t flags; + DB_LSN * lsn; + db_pgno_t pgno; + u_int32_t indx; + db_recno_t recno; + const DBT *data; +{ + DBT logrec; + DB_LSN *lsnp, null_lsn, *rlsnp; + DB_TXNLOGREC *lr; + ENV *env; + u_int32_t zero, uinttmp, rectype, txn_num; + u_int npad; + u_int8_t *bp; + int is_durable, ret; + + COMPQUIET(lr, NULL); + + env = dbp->env; + rlsnp = ret_lsnp; + rectype = DB___qam_delext; + npad = 0; + ret = 0; + + if (LF_ISSET(DB_LOG_NOT_DURABLE) || + F_ISSET(dbp, DB_AM_NOT_DURABLE)) { + if (txnp == NULL) + return (0); + is_durable = 0; + } else + is_durable = 1; + + if (txnp == NULL) { + txn_num = 0; + lsnp = &null_lsn; + null_lsn.file = null_lsn.offset = 0; + } else { + if (TAILQ_FIRST(&txnp->kids) != NULL && + (ret = __txn_activekids(env, rectype, txnp)) != 0) + return (ret); + /* + * We need to assign begin_lsn while holding region mutex. + * That assignment is done inside the DbEnv->log_put call, + * so pass in the appropriate memory location to be filled + * in by the log_put code. + */ + DB_SET_TXN_LSNP(txnp, &rlsnp, &lsnp); + txn_num = txnp->txnid; + } + + DB_ASSERT(env, dbp->log_filename != NULL); + if (dbp->log_filename->id == DB_LOGFILEID_INVALID && + (ret = __dbreg_lazy_id(dbp)) != 0) + return (ret); + + logrec.size = sizeof(rectype) + sizeof(txn_num) + sizeof(DB_LSN) + + sizeof(u_int32_t) + + sizeof(*lsn) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + + sizeof(u_int32_t) + (data == NULL ? 0 : data->size); + if (CRYPTO_ON(env)) { + npad = env->crypto_handle->adj_size(logrec.size); + logrec.size += npad; + } + + if (is_durable || txnp == NULL) { + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) + return (ret); + } else { + if ((ret = __os_malloc(env, + logrec.size + sizeof(DB_TXNLOGREC), &lr)) != 0) + return (ret); +#ifdef DIAGNOSTIC + if ((ret = + __os_malloc(env, logrec.size, &logrec.data)) != 0) { + __os_free(env, lr); + return (ret); + } +#else + logrec.data = lr->data; +#endif + } + if (npad > 0) + memset((u_int8_t *)logrec.data + logrec.size - npad, 0, npad); + + bp = logrec.data; + + LOGCOPY_32(env, bp, &rectype); + bp += sizeof(rectype); + + LOGCOPY_32(env, bp, &txn_num); + bp += sizeof(txn_num); + + LOGCOPY_FROMLSN(env, bp, lsnp); + bp += sizeof(DB_LSN); + + uinttmp = (u_int32_t)dbp->log_filename->id; + LOGCOPY_32(env, bp, &uinttmp); + bp += sizeof(uinttmp); + + if (lsn != NULL) { + if (txnp != NULL) { + LOG *lp = env->lg_handle->reginfo.primary; + if (LOG_COMPARE(lsn, &lp->lsn) >= 0 && (ret = + __log_check_page_lsn(env, dbp, lsn)) != 0) + return (ret); + } + LOGCOPY_FROMLSN(env, bp, lsn); + } else + memset(bp, 0, sizeof(*lsn)); + bp += sizeof(*lsn); + + uinttmp = (u_int32_t)pgno; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + LOGCOPY_32(env, bp, &indx); + bp += sizeof(indx); + + uinttmp = (u_int32_t)recno; + LOGCOPY_32(env,bp, &uinttmp); + bp += sizeof(uinttmp); + + if (data == NULL) { + zero = 0; + LOGCOPY_32(env, bp, &zero); + bp += sizeof(u_int32_t); + } else { + LOGCOPY_32(env, bp, &data->size); + bp += sizeof(data->size); + memcpy(bp, data->data, data->size); + bp += data->size; + } + + DB_ASSERT(env, + (u_int32_t)(bp - (u_int8_t *)logrec.data) <= logrec.size); + + if (is_durable || txnp == NULL) { + if ((ret = __log_put(env, rlsnp,(DBT *)&logrec, + flags | DB_LOG_NOCOPY)) == 0 && txnp != NULL) { + *lsnp = *rlsnp; + if (rlsnp != ret_lsnp) + *ret_lsnp = *rlsnp; + } + } else { + ret = 0; +#ifdef DIAGNOSTIC + /* + * Set the debug bit if we are going to log non-durable + * transactions so they will be ignored by recovery. + */ + memcpy(lr->data, logrec.data, logrec.size); + rectype |= DB_debug_FLAG; + LOGCOPY_32(env, logrec.data, &rectype); + + if (!IS_REP_CLIENT(env)) + ret = __log_put(env, + rlsnp, (DBT *)&logrec, flags | DB_LOG_NOCOPY); +#endif + STAILQ_INSERT_HEAD(&txnp->logs, lr, links); + F_SET((TXN_DETAIL *)txnp->td, TXN_DTL_INMEMORY); + LSN_NOT_LOGGED(*ret_lsnp); + } + +#ifdef LOG_DIAGNOSTIC + if (ret != 0) + (void)__qam_delext_print(env, + (DBT *)&logrec, ret_lsnp, DB_TXN_PRINT, NULL); +#endif + +#ifdef DIAGNOSTIC + __os_free(env, logrec.data); +#else + if (is_durable || txnp == NULL) + __os_free(env, logrec.data); +#endif + return (ret); +} + +/* + * PUBLIC: int __qam_init_recover __P((ENV *, DB_DISTAB *)); + */ +int +__qam_init_recover(env, dtabp) + ENV *env; + DB_DISTAB *dtabp; +{ + int ret; + + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_incfirst_recover, DB___qam_incfirst)) != 0) + return (ret); + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_mvptr_recover, DB___qam_mvptr)) != 0) + return (ret); + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_del_recover, DB___qam_del)) != 0) + return (ret); + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_add_recover, DB___qam_add)) != 0) + return (ret); + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_delext_recover, DB___qam_delext)) != 0) + return (ret); + return (0); +} diff --git a/qam/qam_autop.c b/qam/qam_autop.c new file mode 100644 index 00000000..09d71b8a --- /dev/null +++ b/qam/qam_autop.c @@ -0,0 +1,260 @@ +/* Do not edit: automatically built by gen_rec.awk. */ + +#include "db_config.h" + +#ifdef HAVE_QUEUE +#include "db_int.h" +#include "dbinc/crypto.h" +#include "dbinc/db_page.h" +#include "dbinc/db_dispatch.h" +#include "dbinc/db_am.h" +#include "dbinc/log.h" +#include "dbinc/qam.h" +#include "dbinc/txn.h" + +/* + * PUBLIC: int __qam_incfirst_print __P((ENV *, DBT *, DB_LSN *, + * PUBLIC: db_recops, void *)); + */ +int +__qam_incfirst_print(env, dbtp, lsnp, notused2, notused3) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops notused2; + void *notused3; +{ + __qam_incfirst_args *argp; + int ret; + + notused2 = DB_TXN_PRINT; + notused3 = NULL; + + if ((ret = + __qam_incfirst_read(env, NULL, NULL, dbtp->data, &argp)) != 0) + return (ret); + (void)printf( + "[%lu][%lu]__qam_incfirst%s: rec: %lu txnp %lx prevlsn [%lu][%lu]\n", + (u_long)lsnp->file, (u_long)lsnp->offset, + (argp->type & DB_debug_FLAG) ? "_debug" : "", + (u_long)argp->type, + (u_long)argp->txnp->txnid, + (u_long)argp->prev_lsn.file, (u_long)argp->prev_lsn.offset); + (void)printf("\tfileid: %ld\n", (long)argp->fileid); + (void)printf("\trecno: %lu\n", (u_long)argp->recno); + (void)printf("\tmeta_pgno: %lu\n", (u_long)argp->meta_pgno); + (void)printf("\n"); + __os_free(env, argp); + return (0); +} + +/* + * PUBLIC: int __qam_mvptr_print __P((ENV *, DBT *, DB_LSN *, + * PUBLIC: db_recops, void *)); + */ +int +__qam_mvptr_print(env, dbtp, lsnp, notused2, notused3) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops notused2; + void *notused3; +{ + __qam_mvptr_args *argp; + int ret; + + notused2 = DB_TXN_PRINT; + notused3 = NULL; + + if ((ret = + __qam_mvptr_read(env, NULL, NULL, dbtp->data, &argp)) != 0) + return (ret); + (void)printf( + "[%lu][%lu]__qam_mvptr%s: rec: %lu txnp %lx prevlsn [%lu][%lu]\n", + (u_long)lsnp->file, (u_long)lsnp->offset, + (argp->type & DB_debug_FLAG) ? "_debug" : "", + (u_long)argp->type, + (u_long)argp->txnp->txnid, + (u_long)argp->prev_lsn.file, (u_long)argp->prev_lsn.offset); + (void)printf("\topcode: %lu\n", (u_long)argp->opcode); + (void)printf("\tfileid: %ld\n", (long)argp->fileid); + (void)printf("\told_first: %lu\n", (u_long)argp->old_first); + (void)printf("\tnew_first: %lu\n", (u_long)argp->new_first); + (void)printf("\told_cur: %lu\n", (u_long)argp->old_cur); + (void)printf("\tnew_cur: %lu\n", (u_long)argp->new_cur); + (void)printf("\tmetalsn: [%lu][%lu]\n", + (u_long)argp->metalsn.file, (u_long)argp->metalsn.offset); + (void)printf("\tmeta_pgno: %lu\n", (u_long)argp->meta_pgno); + (void)printf("\n"); + __os_free(env, argp); + return (0); +} + +/* + * PUBLIC: int __qam_del_print __P((ENV *, DBT *, DB_LSN *, + * PUBLIC: db_recops, void *)); + */ +int +__qam_del_print(env, dbtp, lsnp, notused2, notused3) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops notused2; + void *notused3; +{ + __qam_del_args *argp; + int ret; + + notused2 = DB_TXN_PRINT; + notused3 = NULL; + + if ((ret = + __qam_del_read(env, NULL, NULL, dbtp->data, &argp)) != 0) + return (ret); + (void)printf( + "[%lu][%lu]__qam_del%s: rec: %lu txnp %lx prevlsn [%lu][%lu]\n", + (u_long)lsnp->file, (u_long)lsnp->offset, + (argp->type & DB_debug_FLAG) ? "_debug" : "", + (u_long)argp->type, + (u_long)argp->txnp->txnid, + (u_long)argp->prev_lsn.file, (u_long)argp->prev_lsn.offset); + (void)printf("\tfileid: %ld\n", (long)argp->fileid); + (void)printf("\tlsn: [%lu][%lu]\n", + (u_long)argp->lsn.file, (u_long)argp->lsn.offset); + (void)printf("\tpgno: %lu\n", (u_long)argp->pgno); + (void)printf("\tindx: %lu\n", (u_long)argp->indx); + (void)printf("\trecno: %lu\n", (u_long)argp->recno); + (void)printf("\n"); + __os_free(env, argp); + return (0); +} + +/* + * PUBLIC: int __qam_add_print __P((ENV *, DBT *, DB_LSN *, + * PUBLIC: db_recops, void *)); + */ +int +__qam_add_print(env, dbtp, lsnp, notused2, notused3) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops notused2; + void *notused3; +{ + __qam_add_args *argp; + u_int32_t i; + int ch; + int ret; + + notused2 = DB_TXN_PRINT; + notused3 = NULL; + + if ((ret = + __qam_add_read(env, NULL, NULL, dbtp->data, &argp)) != 0) + return (ret); + (void)printf( + "[%lu][%lu]__qam_add%s: rec: %lu txnp %lx prevlsn [%lu][%lu]\n", + (u_long)lsnp->file, (u_long)lsnp->offset, + (argp->type & DB_debug_FLAG) ? "_debug" : "", + (u_long)argp->type, + (u_long)argp->txnp->txnid, + (u_long)argp->prev_lsn.file, (u_long)argp->prev_lsn.offset); + (void)printf("\tfileid: %ld\n", (long)argp->fileid); + (void)printf("\tlsn: [%lu][%lu]\n", + (u_long)argp->lsn.file, (u_long)argp->lsn.offset); + (void)printf("\tpgno: %lu\n", (u_long)argp->pgno); + (void)printf("\tindx: %lu\n", (u_long)argp->indx); + (void)printf("\trecno: %lu\n", (u_long)argp->recno); + (void)printf("\tdata: "); + for (i = 0; i < argp->data.size; i++) { + ch = ((u_int8_t *)argp->data.data)[i]; + printf(isprint(ch) || ch == 0x0a ? "%c" : "%#x ", ch); + } + (void)printf("\n"); + (void)printf("\tvflag: %lu\n", (u_long)argp->vflag); + (void)printf("\tolddata: "); + for (i = 0; i < argp->olddata.size; i++) { + ch = ((u_int8_t *)argp->olddata.data)[i]; + printf(isprint(ch) || ch == 0x0a ? "%c" : "%#x ", ch); + } + (void)printf("\n"); + (void)printf("\n"); + __os_free(env, argp); + return (0); +} + +/* + * PUBLIC: int __qam_delext_print __P((ENV *, DBT *, DB_LSN *, + * PUBLIC: db_recops, void *)); + */ +int +__qam_delext_print(env, dbtp, lsnp, notused2, notused3) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops notused2; + void *notused3; +{ + __qam_delext_args *argp; + u_int32_t i; + int ch; + int ret; + + notused2 = DB_TXN_PRINT; + notused3 = NULL; + + if ((ret = + __qam_delext_read(env, NULL, NULL, dbtp->data, &argp)) != 0) + return (ret); + (void)printf( + "[%lu][%lu]__qam_delext%s: rec: %lu txnp %lx prevlsn [%lu][%lu]\n", + (u_long)lsnp->file, (u_long)lsnp->offset, + (argp->type & DB_debug_FLAG) ? "_debug" : "", + (u_long)argp->type, + (u_long)argp->txnp->txnid, + (u_long)argp->prev_lsn.file, (u_long)argp->prev_lsn.offset); + (void)printf("\tfileid: %ld\n", (long)argp->fileid); + (void)printf("\tlsn: [%lu][%lu]\n", + (u_long)argp->lsn.file, (u_long)argp->lsn.offset); + (void)printf("\tpgno: %lu\n", (u_long)argp->pgno); + (void)printf("\tindx: %lu\n", (u_long)argp->indx); + (void)printf("\trecno: %lu\n", (u_long)argp->recno); + (void)printf("\tdata: "); + for (i = 0; i < argp->data.size; i++) { + ch = ((u_int8_t *)argp->data.data)[i]; + printf(isprint(ch) || ch == 0x0a ? "%c" : "%#x ", ch); + } + (void)printf("\n"); + (void)printf("\n"); + __os_free(env, argp); + return (0); +} + +/* + * PUBLIC: int __qam_init_print __P((ENV *, DB_DISTAB *)); + */ +int +__qam_init_print(env, dtabp) + ENV *env; + DB_DISTAB *dtabp; +{ + int ret; + + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_incfirst_print, DB___qam_incfirst)) != 0) + return (ret); + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_mvptr_print, DB___qam_mvptr)) != 0) + return (ret); + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_del_print, DB___qam_del)) != 0) + return (ret); + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_add_print, DB___qam_add)) != 0) + return (ret); + if ((ret = __db_add_recovery_int(env, dtabp, + __qam_delext_print, DB___qam_delext)) != 0) + return (ret); + return (0); +} +#endif /* HAVE_QUEUE */ diff --git a/qam/qam_conv.c b/qam/qam_conv.c new file mode 100644 index 00000000..fd1d1dc4 --- /dev/null +++ b/qam/qam_conv.c @@ -0,0 +1,79 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1999-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_page.h" +#include "dbinc/db_swap.h" +#include "dbinc/db_am.h" +#include "dbinc/qam.h" + +/* + * __qam_mswap -- + * Swap the bytes on the queue metadata page. + * + * PUBLIC: int __qam_mswap __P((ENV *, PAGE *)); + */ +int +__qam_mswap(env, pg) + ENV *env; + PAGE *pg; +{ + u_int8_t *p; + + COMPQUIET(env, NULL); + + __db_metaswap(pg); + p = (u_int8_t *)pg + sizeof(DBMETA); + + SWAP32(p); /* first_recno */ + SWAP32(p); /* cur_recno */ + SWAP32(p); /* re_len */ + SWAP32(p); /* re_pad */ + SWAP32(p); /* rec_page */ + SWAP32(p); /* page_ext */ + p += 91 * sizeof(u_int32_t); /* unused */ + SWAP32(p); /* crypto_magic */ + + return (0); +} + +/* + * __qam_pgin_out -- + * Convert host-specific page layout to/from the host-independent format + * stored on disk. + * We only need to fix up a few fields in the header + * + * PUBLIC: int __qam_pgin_out __P((ENV *, db_pgno_t, void *, DBT *)); + */ +int +__qam_pgin_out(env, pg, pp, cookie) + ENV *env; + db_pgno_t pg; + void *pp; + DBT *cookie; +{ + DB_PGINFO *pginfo; + QPAGE *h; + + COMPQUIET(pg, 0); + pginfo = (DB_PGINFO *)cookie->data; + if (!F_ISSET(pginfo, DB_AM_SWAP)) + return (0); + + h = pp; + if (h->type == P_QAMMETA) + return (__qam_mswap(env, pp)); + + M_32_SWAP(h->lsn.file); + M_32_SWAP(h->lsn.offset); + M_32_SWAP(h->pgno); + + return (0); +} diff --git a/qam/qam_files.c b/qam/qam_files.c new file mode 100644 index 00000000..928fdfd1 --- /dev/null +++ b/qam/qam_files.c @@ -0,0 +1,894 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1999-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_page.h" +#include "dbinc/db_am.h" +#include "dbinc/log.h" +#include "dbinc/fop.h" +#include "dbinc/mp.h" +#include "dbinc/qam.h" + +#define QAM_EXNAME(Q, I, B, L) \ + snprintf((B), (L), \ + QUEUE_EXTENT, (Q)->dir, PATH_SEPARATOR[0], (Q)->name, (I)) + +/* + * __qam_fprobe -- calculate and open extent + * + * Calculate which extent the page is in, open and create if necessary. + * + * PUBLIC: int __qam_fprobe __P((DBC *, db_pgno_t, + * PUBLIC: void *, qam_probe_mode, DB_CACHE_PRIORITY, u_int32_t)); + */ +int +__qam_fprobe(dbc, pgno, addrp, mode, priority, flags) + DBC *dbc; + db_pgno_t pgno; + void *addrp; + qam_probe_mode mode; + DB_CACHE_PRIORITY priority; + u_int32_t flags; +{ + DB *dbp; + DB_MPOOLFILE *mpf; + ENV *env; + MPFARRAY *array; + QUEUE *qp; + u_int8_t fid[DB_FILE_ID_LEN]; + u_int32_t i, extid, maxext, numext, lflags, offset, oldext, openflags; + char buf[DB_MAXPATHLEN]; + int ftype, less, ret, t_ret; + + dbp = dbc->dbp; + env = dbp->env; + qp = (QUEUE *)dbp->q_internal; + ret = 0; + + if (qp->page_ext == 0) { + mpf = dbp->mpf; + switch (mode) { + case QAM_PROBE_GET: + return (__memp_fget(mpf, &pgno, + dbc->thread_info, dbc->txn, flags, addrp)); + case QAM_PROBE_PUT: + return (__memp_fput(mpf, + dbc->thread_info, addrp, priority)); + case QAM_PROBE_DIRTY: + return (__memp_dirty(mpf, addrp, + dbc->thread_info, dbc->txn, priority, flags)); + case QAM_PROBE_MPF: + *(DB_MPOOLFILE **)addrp = mpf; + return (0); + } + } + + mpf = NULL; + + /* + * Need to lock long enough to find the mpf or create the file. + * The file cannot go away because we must have a record locked + * in that file. + */ + MUTEX_LOCK(env, dbp->mutex); + extid = QAM_PAGE_EXTENT(dbp, pgno); + + /* Array1 will always be in use if array2 is in use. */ + array = &qp->array1; + if (array->n_extent == 0) { + /* Start with 4 extents */ + array->n_extent = 4; + array->low_extent = extid; + numext = offset = oldext = 0; + less = 0; + goto alloc; + } + +retry: + if (extid < array->low_extent) { + less = 1; + offset = array->low_extent - extid; + } else { + less = 0; + offset = extid - array->low_extent; + } + if (qp->array2.n_extent != 0 && + (extid >= qp->array2.low_extent ? + offset > extid - qp->array2.low_extent : + offset > qp->array2.low_extent - extid)) { + array = &qp->array2; + if (extid < array->low_extent) { + less = 1; + offset = array->low_extent - extid; + } else { + less = 0; + offset = extid - array->low_extent; + } + } + + /* + * Check to see if the requested extent is outside the range of + * extents in the array. This is true by default if there are + * no extents here yet. + */ + if (less == 1 || offset >= array->n_extent) { + oldext = array->n_extent; + numext = (array->hi_extent - array->low_extent) + 1; + if (less == 1 && offset + numext <= array->n_extent) { + /* + * If we can fit this one into the existing array by + * shifting the existing entries then we do not have + * to allocate. + */ + memmove(&array->mpfarray[offset], + array->mpfarray, numext + * sizeof(array->mpfarray[0])); + memset(array->mpfarray, 0, offset + * sizeof(array->mpfarray[0])); + offset = 0; + } else if (less == 0 && offset == array->n_extent && + (mode == QAM_PROBE_GET || mode == QAM_PROBE_PUT) && + array->mpfarray[0].pinref == 0) { + /* + * If this is at the end of the array and the file at + * the beginning has a zero pin count we can close + * the bottom extent and put this one at the end. + */ + mpf = array->mpfarray[0].mpf; + if (mpf != NULL && (ret = __memp_fclose(mpf, 0)) != 0) + goto err; + memmove(&array->mpfarray[0], &array->mpfarray[1], + (array->n_extent - 1) * sizeof(array->mpfarray[0])); + array->low_extent++; + array->hi_extent++; + offset--; + array->mpfarray[offset].mpf = NULL; + array->mpfarray[offset].pinref = 0; + } else { + /* + * See if we have wrapped around the queue. + * If it has then allocate the second array. + * Otherwise just expand the one we are using. + */ + maxext = (u_int32_t) UINT32_MAX + / (qp->page_ext * qp->rec_page); + if (offset >= maxext/2) { + array = &qp->array2; + DB_ASSERT(env, array->n_extent == 0); + oldext = 0; + array->n_extent = 4; + array->low_extent = extid; + offset = 0; + numext = 0; + } else if (array->mpfarray[0].pinref == 0) { + /* + * Check to see if there are extents marked + * for deletion at the beginning of the cache. + * If so close them so they will go away. + */ + for (i = 0; i < array->n_extent; i++) { + if (array->mpfarray[i].pinref != 0) + break; + mpf = array->mpfarray[i].mpf; + if (mpf == NULL) + continue; + (void)__memp_get_flags(mpf, &lflags); + if (!FLD_ISSET(lflags, DB_MPOOL_UNLINK)) + break; + + array->mpfarray[i].mpf = NULL; + if ((ret = __memp_fclose(mpf, 0)) != 0) + goto err; + } + if (i == 0) + goto increase; + memmove(&array->mpfarray[0], + &array->mpfarray[i], + (array->n_extent - i) * + sizeof(array->mpfarray[0])); + memset(&array->mpfarray[array->n_extent - i], + '\0', i * sizeof(array->mpfarray[0])); + array->low_extent += i; + array->hi_extent += i; + goto retry; + } else { + /* + * Increase the size to at least include + * the new one and double it. + */ +increase: array->n_extent += offset; + array->n_extent <<= 2; + } +alloc: if ((ret = __os_realloc(env, + array->n_extent * sizeof(struct __qmpf), + &array->mpfarray)) != 0) + goto err; + + if (less == 1) { + /* + * Move the array up and put the new one + * in the first slot. + */ + memmove(&array->mpfarray[offset], + array->mpfarray, + numext * sizeof(array->mpfarray[0])); + memset(array->mpfarray, 0, + offset * sizeof(array->mpfarray[0])); + memset(&array->mpfarray[numext + offset], 0, + (array->n_extent - (numext + offset)) + * sizeof(array->mpfarray[0])); + offset = 0; + } + else + /* Clear the new part of the array. */ + memset(&array->mpfarray[oldext], 0, + (array->n_extent - oldext) * + sizeof(array->mpfarray[0])); + } + } + + /* Update the low and hi range of saved extents. */ + if (extid < array->low_extent) + array->low_extent = extid; + if (extid > array->hi_extent) + array->hi_extent = extid; + + /* If the extent file is not yet open, open it. */ + if (array->mpfarray[offset].mpf == NULL) { + QAM_EXNAME(qp, extid, buf, sizeof(buf)); + if ((ret = __memp_fcreate( + env, &array->mpfarray[offset].mpf)) != 0) + goto err; + mpf = array->mpfarray[offset].mpf; + (void)__memp_set_lsn_offset(mpf, 0); + (void)__memp_set_pgcookie(mpf, &qp->pgcookie); + (void)__memp_get_ftype(dbp->mpf, &ftype); + (void)__memp_set_ftype(mpf, ftype); + (void)__memp_set_clear_len(mpf, dbp->pgsize); + + /* Set up the fileid for this extent. */ + __qam_exid(dbp, fid, extid); + (void)__memp_set_fileid(mpf, fid); + openflags = DB_EXTENT; + if (LF_ISSET(DB_MPOOL_CREATE)) + openflags |= DB_CREATE; + if (F_ISSET(dbp, DB_AM_RDONLY)) + openflags |= DB_RDONLY; + if (F_ISSET(env->dbenv, DB_ENV_DIRECT_DB)) + openflags |= DB_DIRECT; + if ((ret = __memp_fopen(mpf, NULL, + buf, NULL, openflags, qp->mode, dbp->pgsize)) != 0) { + array->mpfarray[offset].mpf = NULL; + (void)__memp_fclose(mpf, 0); + goto err; + } + } + + /* + * We have found the right file. Update its ref count + * before dropping the dbp mutex so it does not go away. + */ + mpf = array->mpfarray[offset].mpf; + if (mode == QAM_PROBE_GET) + array->mpfarray[offset].pinref++; + + /* + * If we may create the page, then we are writing, + * the file may nolonger be empty after this operation + * so we clear the UNLINK flag. + */ + if (LF_ISSET(DB_MPOOL_CREATE)) + (void)__memp_set_flags(mpf, DB_MPOOL_UNLINK, 0); + +err: + MUTEX_UNLOCK(env, dbp->mutex); + + if (ret == 0) { + pgno--; + pgno %= qp->page_ext; + switch (mode) { + case QAM_PROBE_GET: + ret = __memp_fget(mpf, &pgno, + dbc->thread_info, dbc->txn, flags, addrp); + if (ret == 0) + return (0); + break; + case QAM_PROBE_PUT: + ret = __memp_fput(mpf, + dbc->thread_info, addrp, dbp->priority); + break; + case QAM_PROBE_DIRTY: + return (__memp_dirty(mpf, addrp, + dbc->thread_info, dbc->txn, dbp->priority, flags)); + case QAM_PROBE_MPF: + *(DB_MPOOLFILE **)addrp = mpf; + return (0); + } + + MUTEX_LOCK(env, dbp->mutex); + /* Recalculate because we dropped the lock. */ + offset = extid - array->low_extent; + DB_ASSERT(env, array->mpfarray[offset].pinref > 0); + if (--array->mpfarray[offset].pinref == 0 && + (mode == QAM_PROBE_GET || ret == 0)) { + /* Check to see if this file will be unlinked. */ + (void)__memp_get_flags(mpf, &flags); + if (LF_ISSET(DB_MPOOL_UNLINK)) { + array->mpfarray[offset].mpf = NULL; + if ((t_ret = + __memp_fclose(mpf, 0)) != 0 && ret == 0) + ret = t_ret; + } + } + MUTEX_UNLOCK(env, dbp->mutex); + } + return (ret); +} + +/* + * __qam_fclose -- close an extent. + * + * Calculate which extent the page is in and close it. + * We assume the mpf entry is present. + * + * PUBLIC: int __qam_fclose __P((DB *, db_pgno_t)); + */ +int +__qam_fclose(dbp, pgnoaddr) + DB *dbp; + db_pgno_t pgnoaddr; +{ + DB_MPOOLFILE *mpf; + ENV *env; + MPFARRAY *array; + QUEUE *qp; + u_int32_t extid, offset; + int ret; + + ret = 0; + env = dbp->env; + qp = (QUEUE *)dbp->q_internal; + + MUTEX_LOCK(env, dbp->mutex); + + extid = QAM_PAGE_EXTENT(dbp, pgnoaddr); + array = &qp->array1; + if (array->low_extent > extid || array->hi_extent < extid) + array = &qp->array2; + offset = extid - array->low_extent; + + DB_ASSERT(env, + extid >= array->low_extent && offset < array->n_extent); + + /* If other threads are still using this file, leave it. */ + if (array->mpfarray[offset].pinref != 0) + goto done; + + mpf = array->mpfarray[offset].mpf; + array->mpfarray[offset].mpf = NULL; + ret = __memp_fclose(mpf, 0); + +done: + MUTEX_UNLOCK(env, dbp->mutex); + return (ret); +} + +/* + * __qam_fremove -- remove an extent. + * + * Calculate which extent the page is in and remove it. There is no way + * to remove an extent without probing it first and seeing that is is empty + * so we assume the mpf entry is present. + * + * PUBLIC: int __qam_fremove __P((DB *, db_pgno_t)); + */ +int +__qam_fremove(dbp, pgnoaddr) + DB *dbp; + db_pgno_t pgnoaddr; +{ + DB_MPOOLFILE *mpf; + ENV *env; + MPFARRAY *array; + QUEUE *qp; + u_int32_t extid, offset; + int ret; + + qp = (QUEUE *)dbp->q_internal; + env = dbp->env; + ret = 0; + + MUTEX_LOCK(env, dbp->mutex); + + extid = QAM_PAGE_EXTENT(dbp, pgnoaddr); + array = &qp->array1; + if (array->low_extent > extid || array->hi_extent < extid) + array = &qp->array2; + offset = extid - array->low_extent; + + DB_ASSERT(env, + extid >= array->low_extent && offset < array->n_extent); + + mpf = array->mpfarray[offset].mpf; + /* This extent my already be marked for delete and closed. */ + if (mpf == NULL) + goto err; + + /* + * The log must be flushed before the file is deleted. We depend on + * the log record of the last delete to recreate the file if we crash. + */ + if (LOGGING_ON(env) && (ret = __log_flush(env, NULL)) != 0) + goto err; + + (void)__memp_set_flags(mpf, DB_MPOOL_UNLINK, 1); + /* Someone could be real slow, let them close it down. */ + if (array->mpfarray[offset].pinref != 0) + goto err; + array->mpfarray[offset].mpf = NULL; + if ((ret = __memp_fclose(mpf, 0)) != 0) + goto err; + + /* + * If the file is at the bottom of the array + * shift things down and adjust the end points. + */ + if (offset == 0) { + memmove(array->mpfarray, &array->mpfarray[1], + (array->hi_extent - array->low_extent) + * sizeof(array->mpfarray[0])); + array->mpfarray[ + array->hi_extent - array->low_extent].mpf = NULL; + if (array->low_extent != array->hi_extent) + array->low_extent++; + } else { + if (extid == array->hi_extent) + array->hi_extent--; + } + +err: MUTEX_UNLOCK(env, dbp->mutex); + + return (ret); +} + +/* + * __qam_sync -- + * Flush the database cache. + * + * PUBLIC: int __qam_sync __P((DB *)); + */ +int +__qam_sync(dbp) + DB *dbp; +{ + int ret; + /* + * We can't easily identify the extent files associated with a specific + * Queue file, so flush all Queue extent files. + */ + if ((ret = __memp_fsync(dbp->mpf)) != 0) + return (ret); + if (((QUEUE *)dbp->q_internal)->page_ext != 0) + return (__memp_sync_int( + dbp->env, NULL, 0, DB_SYNC_QUEUE_EXTENT, NULL, NULL)); + return (0); +} + +/* + * __qam_gen_filelist -- generate a list of extent files. + * Another thread may close the handle so this should only + * be used single threaded or with care. + * + * PUBLIC: int __qam_gen_filelist __P((DB *, + * PUBLIC: DB_THREAD_INFO *, QUEUE_FILELIST **)); + */ +int +__qam_gen_filelist(dbp, ip, filelistp) + DB *dbp; + DB_THREAD_INFO *ip; + QUEUE_FILELIST **filelistp; +{ + DBC *dbc; + DB_MPOOLFILE *mpf; + ENV *env; + QMETA *meta; + QUEUE *qp; + size_t extent_cnt; + db_recno_t i, current, first, stop, rec_extent; + QUEUE_FILELIST *fp; + int ret; + + env = dbp->env; + mpf = dbp->mpf; + qp = (QUEUE *)dbp->q_internal; + *filelistp = NULL; + + if (qp->page_ext == 0) + return (0); + + /* This may happen during metapage recovery. */ + if (qp->name == NULL) + return (0); + + /* Find out the first and last record numbers in the database. */ + i = PGNO_BASE_MD; + if ((ret = __memp_fget(mpf, &i, ip, NULL, 0, &meta)) != 0) + return (ret); + + current = meta->cur_recno; + first = meta->first_recno; + + if ((ret = __memp_fput(mpf, ip, meta, dbp->priority)) != 0) + return (ret); + + /* + * Allocate the extent array. Calculate the worst case number of + * pages and convert that to a count of extents. The count of + * extents has 3 or 4 extra slots: + * roundoff at first (e.g., current record in extent); + * roundoff at current (e.g., first record in extent); + * NULL termination; and + * UINT32_MAX wraparound (the last extent can be small). + */ + rec_extent = qp->rec_page * qp->page_ext; + if (current >= first) + extent_cnt = (current - first) / rec_extent + 3; + else + extent_cnt = + (current + (UINT32_MAX - first)) / rec_extent + 4; + + if (extent_cnt == 0) + return (0); + if ((ret = __os_calloc(env, + extent_cnt, sizeof(QUEUE_FILELIST), filelistp)) != 0) + return (ret); + fp = *filelistp; + if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) + return (ret); + +again: + if (current >= first) + stop = current; + else + stop = UINT32_MAX; + + /* + * Make sure that first is at the same offset in the extent as stop. + * This guarantees that the stop will be reached in the loop below, + * even if it is the only record in its extent. This calculation is + * safe because first won't move out of its extent. + */ + first -= first % rec_extent; + first += stop % rec_extent; + + for (i = first; i >= first && i <= stop; i += rec_extent) { + if ((ret = __qam_fprobe(dbc, QAM_RECNO_PAGE(dbp, i), + &fp->mpf, QAM_PROBE_MPF, dbp->priority, 0)) != 0) { + if (ret == ENOENT) + continue; + goto err; + } + fp->id = QAM_RECNO_EXTENT(dbp, i); + fp++; + DB_ASSERT(env, (size_t)(fp - *filelistp) < extent_cnt); + } + + if (current < first) { + first = 1; + goto again; + } + +err: (void)__dbc_close(dbc); + return (ret); +} + +/* + * __qam_extent_names -- generate a list of extent files names. + * + * PUBLIC: int __qam_extent_names __P((ENV *, char *, char ***)); + */ +int +__qam_extent_names(env, name, namelistp) + ENV *env; + char *name; + char ***namelistp; +{ + DB *dbp; + DB_THREAD_INFO *ip; + QUEUE *qp; + QUEUE_FILELIST *filelist, *fp; + size_t len; + int cnt, ret, t_ret; + char buf[DB_MAXPATHLEN], **cp, *freep; + + *namelistp = NULL; + filelist = NULL; + ENV_GET_THREAD_INFO(env, ip); + if ((ret = __db_create_internal(&dbp, env, 0)) != 0) + return (ret); + if ((ret = __db_open(dbp, ip, + NULL, name, NULL, DB_QUEUE, DB_RDONLY, 0, PGNO_BASE_MD)) != 0) + goto done; + qp = dbp->q_internal; + if (qp->page_ext == 0) + goto done; + + if ((ret = __qam_gen_filelist(dbp, ip, &filelist)) != 0) + goto done; + + if (filelist == NULL) + goto done; + + cnt = 0; + for (fp = filelist; fp->mpf != NULL; fp++) + cnt++; + + /* QUEUE_EXTENT contains extra chars, but add 6 anyway for the int. */ + len = (size_t)cnt * (sizeof(**namelistp) + + strlen(QUEUE_EXTENT) + strlen(qp->dir) + strlen(qp->name) + 6); + + if ((ret = __os_malloc(dbp->env, len, namelistp)) != 0) + goto done; + cp = *namelistp; + freep = (char *)(cp + cnt + 1); + for (fp = filelist; fp->mpf != NULL; fp++) { + QAM_EXNAME(qp, fp->id, buf, sizeof(buf)); + len = strlen(buf); + *cp++ = freep; + (void)strcpy(freep, buf); + freep += len + 1; + } + *cp = NULL; + +done: + if (filelist != NULL) + __os_free(dbp->env, filelist); + if ((t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +/* + * __qam_exid -- + * Generate a fileid for an extent based on the fileid of the main + * file. Since we do not log schema creates/deletes explicitly, the log + * never captures the fileid of an extent file. In order that masters and + * replicas have the same fileids (so they can explicitly delete them), we + * use computed fileids for the extent files of Queue files. + * + * An extent file id retains the low order 12 bytes of the file id and + * overwrites the dev/inode fields, placing a 0 in the inode field, and + * the extent number in the dev field. + * + * PUBLIC: void __qam_exid __P((DB *, u_int8_t *, u_int32_t)); + */ +void +__qam_exid(dbp, fidp, exnum) + DB *dbp; + u_int8_t *fidp; + u_int32_t exnum; +{ + int i; + u_int8_t *p; + + /* Copy the fileid from the master. */ + memcpy(fidp, dbp->fileid, DB_FILE_ID_LEN); + + /* The first four bytes are the inode or the FileIndexLow; 0 it. */ + for (i = sizeof(u_int32_t); i > 0; --i) + *fidp++ = 0; + + /* The next four bytes are the dev/FileIndexHigh; insert the exnum . */ + for (p = (u_int8_t *)&exnum, i = sizeof(u_int32_t); i > 0; --i) + *fidp++ = *p++; +} + +/* + * __qam_nameop -- + * Remove or rename extent files associated with a particular file. + * This is to remove or rename (both in mpool and the file system) any + * extent files associated with the given dbp. + * This is either called from the QUEUE remove or rename methods or + * when undoing a transaction that created the database. + * + * PUBLIC: int __qam_nameop __P((DB *, DB_TXN *, const char *, qam_name_op)); + */ +int +__qam_nameop(dbp, txn, newname, op) + DB *dbp; + DB_TXN *txn; + const char *newname; + qam_name_op op; +{ + ENV *env; + QUEUE *qp; + size_t exlen, fulllen, len; + u_int8_t fid[DB_FILE_ID_LEN]; + u_int32_t exid; + int cnt, i, ret, t_ret; + char buf[DB_MAXPATHLEN], nbuf[DB_MAXPATHLEN], sepsave; + char *endname, *endpath, *exname, *fullname, **names; + char *ndir, *namep, *new, *cp; + + env = dbp->env; + qp = (QUEUE *)dbp->q_internal; + cnt = ret = t_ret = 0; + namep = exname = fullname = NULL; + names = NULL; + + /* If this isn't a queue with extents, we're done. */ + if (qp->page_ext == 0) + return (0); + + /* + * Generate the list of all queue extents for this file (from the + * file system) and then cycle through removing them and evicting + * from mpool. We have two modes of operation here. If we are + * undoing log operations, then do not write log records and try + * to keep going even if we encounter failures in nameop. If we + * are in mainline code, then return as soon as we have a problem. + * Memory allocation errors (__db_appname, __os_malloc) are always + * considered failure. + * + * Set buf to : dir/__dbq.NAME.0 and fullname to HOME/dir/__dbq.NAME.0 + * or, in the case of an absolute path: /dir/__dbq.NAME.0 + */ + QAM_EXNAME(qp, 0, buf, sizeof(buf)); + if ((ret = __db_appname(env, + DB_APP_DATA, buf, &dbp->dirname, &fullname)) != 0) + return (ret); + + /* We should always have a path separator here. */ + if ((endpath = __db_rpath(fullname)) == NULL) { + ret = EINVAL; + goto err; + } + sepsave = *endpath; + *endpath = '\0'; + + /* + * Get the list of all names in the directory and restore the + * path separator. + */ + if ((ret = __os_dirlist(env, fullname, 0, &names, &cnt)) != 0) + goto err; + *endpath = sepsave; + + /* If there aren't any names, don't allocate any space. */ + if (cnt == 0) + goto err; + + /* + * Now, make endpath reference the queue extent names upon which + * we can match. Then we set the end of the path to be the + * beginning of the extent number, and we can compare the bytes + * between endpath and endname (__dbq.NAME.). + */ + endpath++; + endname = strrchr(endpath, '.'); + if (endname == NULL) { + ret = EINVAL; + goto err; + } + ++endname; + *endname = '\0'; + len = strlen(endpath); + fulllen = strlen(fullname); + + /* Allocate space for a full extent name. */ + exlen = fulllen + 20; + if ((ret = __os_malloc(env, exlen, &exname)) != 0) + goto err; + + ndir = new = NULL; + if (newname != NULL) { + if ((ret = __os_strdup(env, newname, &namep)) != 0) + goto err; + ndir = namep; + if ((new = __db_rpath(namep)) != NULL) + *new++ = '\0'; + else { + new = namep; + ndir = PATH_DOT; + } + } + for (i = 0; i < cnt; i++) { + /* Check if this is a queue extent file. */ + if (strncmp(names[i], endpath, len) != 0) + continue; + /* Make sure we have all numbers. foo.db vs. foo.db.0. */ + for (cp = &names[i][len]; *cp != '\0'; cp++) + if (!isdigit((int)*cp)) + break; + if (*cp != '\0') + continue; + + /* + * We have a queue extent file. We need to generate its + * name and its fileid. + */ + exid = (u_int32_t)strtoul(names[i] + len, NULL, 10); + __qam_exid(dbp, fid, exid); + + switch (op) { + case QAM_NAME_DISCARD: + snprintf(exname, exlen, + "%s%s", fullname, names[i] + len); + if ((t_ret = __memp_nameop(dbp->env, + fid, NULL, exname, NULL, + F_ISSET(dbp, DB_AM_INMEM))) != 0 && ret == 0) + ret = t_ret; + break; + + case QAM_NAME_RENAME: + snprintf(nbuf, sizeof(nbuf), QUEUE_EXTENT, + ndir, PATH_SEPARATOR[0], new, exid); + QAM_EXNAME(qp, exid, buf, sizeof(buf)); + if ((ret = __fop_rename(env, + txn, buf, nbuf, &dbp->dirname, fid, DB_APP_DATA, 1, + F_ISSET(dbp, DB_AM_NOT_DURABLE) ? + DB_LOG_NOT_DURABLE : 0)) != 0) + goto err; + break; + + case QAM_NAME_REMOVE: + QAM_EXNAME(qp, exid, buf, sizeof(buf)); + if ((ret = __fop_remove(env, txn, fid, + buf, &dbp->dirname, + DB_APP_DATA, F_ISSET(dbp, DB_AM_NOT_DURABLE) ? + DB_LOG_NOT_DURABLE : 0)) != 0) + goto err; + break; + } + } + +err: if (fullname != NULL) + __os_free(env, fullname); + if (exname != NULL) + __os_free(env, exname); + if (namep != NULL) + __os_free(env, namep); + if (names != NULL) + __os_dirfree(env, names, cnt); + return (ret); +} + +/* + * __qam_lsn_reset -- reset the lsns for extents. + * + * PUBLIC: int __qam_lsn_reset __P((DB *, DB_THREAD_INFO *)); + */ +int +__qam_lsn_reset(dbp, ip) + DB *dbp; + DB_THREAD_INFO *ip; +{ + QUEUE *qp; + QUEUE_FILELIST *filelist, *fp; + int ret; + + qp = dbp->q_internal; + if (qp->page_ext == 0) + return (0); + + if ((ret = __qam_gen_filelist(dbp, ip, &filelist)) != 0) + return (ret); + + if (filelist == NULL) + return (ret); + + for (fp = filelist; fp->mpf != NULL; fp++) + if ((ret = __db_lsn_reset(fp->mpf, ip)) != 0) + break; + + __os_free(dbp->env, filelist); + return (ret); +} diff --git a/qam/qam_method.c b/qam/qam_method.c new file mode 100644 index 00000000..af22794e --- /dev/null +++ b/qam/qam_method.c @@ -0,0 +1,398 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001, 2010 Oracle and/or its affiliates. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_page.h" +#include "dbinc/db_am.h" +#include "dbinc/lock.h" +#include "dbinc/mp.h" +#include "dbinc/qam.h" +#include "dbinc/txn.h" + +static int __qam_rr __P((DB *, DB_THREAD_INFO *, DB_TXN *, + const char *, const char *, const char *, qam_name_op)); +static int __qam_set_extentsize __P((DB *, u_int32_t)); + +/* + * __qam_db_create -- + * Queue specific initialization of the DB structure. + * + * PUBLIC: int __qam_db_create __P((DB *)); + */ +int +__qam_db_create(dbp) + DB *dbp; +{ + QUEUE *t; + int ret; + + /* Allocate and initialize the private queue structure. */ + if ((ret = __os_calloc(dbp->env, 1, sizeof(QUEUE), &t)) != 0) + return (ret); + dbp->q_internal = t; + dbp->get_q_extentsize = __qam_get_extentsize; + dbp->set_q_extentsize = __qam_set_extentsize; + + t->re_pad = ' '; + + return (0); +} + +/* + * __qam_db_close -- + * Queue specific discard of the DB structure. + * + * PUBLIC: int __qam_db_close __P((DB *, u_int32_t)); + */ +int +__qam_db_close(dbp, flags) + DB *dbp; + u_int32_t flags; +{ + DB_MPOOLFILE *mpf; + MPFARRAY *array; + QUEUE *t; + struct __qmpf *mpfp; + u_int32_t i; + int ret, t_ret; + + ret = 0; + if ((t = dbp->q_internal) == NULL) + return (0); + + array = &t->array1; +again: + mpfp = array->mpfarray; + if (mpfp != NULL) { + for (i = array->low_extent; + i <= array->hi_extent; i++, mpfp++) { + mpf = mpfp->mpf; + mpfp->mpf = NULL; + if (mpf != NULL && (t_ret = __memp_fclose(mpf, + LF_ISSET(DB_AM_DISCARD) ? DB_MPOOL_DISCARD : 0)) + != 0 && ret == 0) + ret = t_ret; + } + __os_free(dbp->env, array->mpfarray); + } + if (t->array2.n_extent != 0) { + array = &t->array2; + array->n_extent = 0; + goto again; + } + + if (LF_ISSET(DB_AM_DISCARD) && + (t_ret = __qam_nameop(dbp, NULL, + NULL, QAM_NAME_DISCARD)) != 0 && ret == 0) + ret = t_ret; + + if (t->path != NULL) + __os_free(dbp->env, t->path); + __os_free(dbp->env, t); + dbp->q_internal = NULL; + + return (ret); +} + +/* + * __qam_get_extentsize -- + * The DB->q_get_extentsize method. + * + * PUBLIC: int __qam_get_extentsize __P((DB *, u_int32_t *)); + */ +int +__qam_get_extentsize(dbp, q_extentsizep) + DB *dbp; + u_int32_t *q_extentsizep; +{ + *q_extentsizep = ((QUEUE*)dbp->q_internal)->page_ext; + return (0); +} + +static int +__qam_set_extentsize(dbp, extentsize) + DB *dbp; + u_int32_t extentsize; +{ + DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_extentsize"); + + if (extentsize < 1) { + __db_errx(dbp->env, "Extent size must be at least 1"); + return (EINVAL); + } + + ((QUEUE*)dbp->q_internal)->page_ext = extentsize; + + return (0); +} + +/* + * __queue_pageinfo - + * Given a dbp, get first/last page information about a queue. + * + * PUBLIC: int __queue_pageinfo __P((DB *, db_pgno_t *, db_pgno_t *, + * PUBLIC: int *, int, u_int32_t)); + */ +int +__queue_pageinfo(dbp, firstp, lastp, emptyp, prpage, flags) + DB *dbp; + db_pgno_t *firstp, *lastp; + int *emptyp; + int prpage; + u_int32_t flags; +{ + DB_MPOOLFILE *mpf; + DB_THREAD_INFO *ip; + QMETA *meta; + db_pgno_t first, i, last; + int empty, ret, t_ret; + + mpf = dbp->mpf; + ENV_GET_THREAD_INFO(dbp->env, ip); + + /* Find out the page number of the last page in the database. */ + i = PGNO_BASE_MD; + if ((ret = __memp_fget(mpf, &i, ip, NULL, 0, &meta)) != 0) + return (ret); + + first = QAM_RECNO_PAGE(dbp, meta->first_recno); + last = QAM_RECNO_PAGE( + dbp, meta->cur_recno == 1 ? 1 : meta->cur_recno - 1); + + empty = meta->cur_recno == meta->first_recno; + if (firstp != NULL) + *firstp = first; + if (lastp != NULL) + *lastp = last; + if (emptyp != NULL) + *emptyp = empty; +#ifdef HAVE_STATISTICS + if (prpage) + ret = __db_prpage(dbp, (PAGE *)meta, flags); +#else + COMPQUIET(prpage, 0); + COMPQUIET(flags, 0); +#endif + + if ((t_ret = __memp_fput(mpf, + ip, meta, dbp->priority)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +#ifdef HAVE_STATISTICS +/* + * __db_prqueue -- + * Print out a queue + * + * PUBLIC: int __db_prqueue __P((DB *, u_int32_t)); + */ +int +__db_prqueue(dbp, flags) + DB *dbp; + u_int32_t flags; +{ + DBC *dbc; + DB_THREAD_INFO *ip; + PAGE *h; + db_pgno_t first, i, last, pg_ext, stop; + int empty, ret, t_ret; + + if ((ret = __queue_pageinfo(dbp, &first, &last, &empty, 1, flags)) != 0) + return (ret); + + if (empty || ret != 0) + return (ret); + + ENV_GET_THREAD_INFO(dbp->env, ip); + if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) + return (ret); + i = first; + if (first > last) + stop = QAM_RECNO_PAGE(dbp, UINT32_MAX); + else + stop = last; + + /* Dump each page. */ + pg_ext = ((QUEUE *)dbp->q_internal)->page_ext; +begin: + for (; i <= stop; ++i) { + if ((ret = __qam_fget(dbc, &i, 0, &h)) != 0) { + if (pg_ext == 0) { + if (ret == DB_PAGE_NOTFOUND && first == last) + ret = 0; + goto err; + } + if (ret == ENOENT || ret == DB_PAGE_NOTFOUND) { + i += (pg_ext - ((i - 1) % pg_ext)) - 1; + ret = 0; + continue; + } + goto err; + } + (void)__db_prpage(dbp, h, flags); + if ((ret = __qam_fput(dbc, i, h, dbp->priority)) != 0) + goto err; + } + + if (first > last) { + i = 1; + stop = last; + first = last; + goto begin; + } + +err: + if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} +#endif + +/* + * __qam_remove -- + * Remove method for a Queue. + * + * PUBLIC: int __qam_remove __P((DB *, DB_THREAD_INFO *, DB_TXN *, + * PUBLIC: const char *, const char *, u_int32_t)); + */ +int +__qam_remove(dbp, ip, txn, name, subdb, flags) + DB *dbp; + DB_THREAD_INFO *ip; + DB_TXN *txn; + const char *name, *subdb; + u_int32_t flags; +{ + COMPQUIET(flags, 0); + return (__qam_rr(dbp, ip, txn, name, subdb, NULL, QAM_NAME_REMOVE)); +} + +/* + * __qam_rename -- + * Rename method for a Queue. + * + * PUBLIC: int __qam_rename __P((DB *, DB_THREAD_INFO *, DB_TXN *, + * PUBLIC: const char *, const char *, const char *)); + */ +int +__qam_rename(dbp, ip, txn, name, subdb, newname) + DB *dbp; + DB_THREAD_INFO *ip; + DB_TXN *txn; + const char *name, *subdb, *newname; +{ + return (__qam_rr(dbp, ip, txn, name, subdb, newname, QAM_NAME_RENAME)); +} + +/* + * __qam_rr -- + * Remove/Rename method for a Queue. + */ +static int +__qam_rr(dbp, ip, txn, name, subdb, newname, op) + DB *dbp; + DB_THREAD_INFO *ip; + DB_TXN *txn; + const char *name, *subdb, *newname; + qam_name_op op; +{ + DB *tmpdbp; + ENV *env; + QUEUE *qp; + int ret, t_ret; + + env = dbp->env; + ret = 0; + + if (subdb != NULL && name != NULL) { + __db_errx(env, + "Queue does not support multiple databases per file"); + return (EINVAL); + } + + /* + * Since regular rename no longer opens the database, we may have + * to do it here. + */ + if (F_ISSET(dbp, DB_AM_OPEN_CALLED)) + tmpdbp = dbp; + else { + if ((ret = __db_create_internal(&tmpdbp, env, 0)) != 0) + return (ret); + + /* + * We need to make sure we don't self-deadlock, so give + * this dbp the same locker as the incoming one. + */ + tmpdbp->locker = dbp->locker; + if ((ret = __db_open(tmpdbp, ip, txn, + name, NULL, DB_QUEUE, DB_RDONLY, 0, PGNO_BASE_MD)) != 0) + goto err; + } + + qp = (QUEUE *)tmpdbp->q_internal; + if (qp->page_ext != 0) + ret = __qam_nameop(tmpdbp, txn, newname, op); + + if (!F_ISSET(dbp, DB_AM_OPEN_CALLED)) { +err: /* + * Since we copied the locker ID from the dbp, we'd better not + * free it here. + */ + tmpdbp->locker = NULL; + + /* We need to remove the lock event we associated with this. */ + if (txn != NULL) + __txn_remlock(env, + txn, &tmpdbp->handle_lock, DB_LOCK_INVALIDID); + + if ((t_ret = __db_close(tmpdbp, + txn, DB_NOSYNC)) != 0 && ret == 0) + ret = t_ret; + } + return (ret); +} + +/* + * __qam_map_flags -- + * Map queue-specific flags from public to the internal values. + * + * PUBLIC: void __qam_map_flags __P((DB *, u_int32_t *, u_int32_t *)); + */ +void +__qam_map_flags(dbp, inflagsp, outflagsp) + DB *dbp; + u_int32_t *inflagsp, *outflagsp; +{ + COMPQUIET(dbp, NULL); + + if (FLD_ISSET(*inflagsp, DB_INORDER)) { + FLD_SET(*outflagsp, DB_AM_INORDER); + FLD_CLR(*inflagsp, DB_INORDER); + } +} + +/* + * __qam_set_flags -- + * Set queue-specific flags. + * + * PUBLIC: int __qam_set_flags __P((DB *, u_int32_t *flagsp)); + */ +int +__qam_set_flags(dbp, flagsp) + DB *dbp; + u_int32_t *flagsp; +{ + + __qam_map_flags(dbp, flagsp, &dbp->flags); + return (0); +} diff --git a/qam/qam_open.c b/qam/qam_open.c new file mode 100644 index 00000000..7c733aa7 --- /dev/null +++ b/qam/qam_open.c @@ -0,0 +1,352 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1999-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/crypto.h" +#include "dbinc/db_page.h" +#include "dbinc/db_swap.h" +#include "dbinc/db_am.h" +#include "dbinc/lock.h" +#include "dbinc/mp.h" +#include "dbinc/qam.h" +#include "dbinc/fop.h" + +static int __qam_init_meta __P((DB *, QMETA *)); + +/* + * __qam_open + * + * PUBLIC: int __qam_open __P((DB *, DB_THREAD_INFO *, + * PUBLIC: DB_TXN *, const char *, db_pgno_t, int, u_int32_t)); + */ +int +__qam_open(dbp, ip, txn, name, base_pgno, mode, flags) + DB *dbp; + DB_THREAD_INFO *ip; + DB_TXN *txn; + const char *name; + db_pgno_t base_pgno; + int mode; + u_int32_t flags; +{ + DBC *dbc; + DB_LOCK metalock; + DB_MPOOLFILE *mpf; + ENV *env; + QMETA *qmeta; + QUEUE *t; + int ret, t_ret; + + env = dbp->env; + mpf = dbp->mpf; + t = dbp->q_internal; + ret = 0; + qmeta = NULL; + + if (name == NULL && t->page_ext != 0) { + __db_errx(env, + "Extent size may not be specified for in-memory queue database"); + return (EINVAL); + } + + if (MULTIVERSION(dbp)) { + __db_errx(env, + "Multiversion queue databases are not supported"); + return (EINVAL); + } + + /* Initialize the remaining fields/methods of the DB. */ + dbp->db_am_remove = __qam_remove; + dbp->db_am_rename = __qam_rename; + + /* + * Get a cursor. If DB_CREATE is specified, we may be creating + * pages, and to do that safely in CDB we need a write cursor. + * In STD_LOCKING mode, we'll synchronize using the meta page + * lock instead. + */ + if ((ret = __db_cursor(dbp, ip, txn, &dbc, + LF_ISSET(DB_CREATE) && CDB_LOCKING(env) ? + DB_WRITECURSOR : 0)) != 0) + return (ret); + + /* + * Get the meta data page. It must exist, because creates of + * files/databases come in through the __qam_new_file interface + * and queue doesn't support subdatabases. + */ + if ((ret = + __db_lget(dbc, 0, base_pgno, DB_LOCK_READ, 0, &metalock)) != 0) + goto err; + if ((ret = __memp_fget(mpf, &base_pgno, ip, txn, 0, &qmeta)) != 0) + goto err; + + /* If the magic number is incorrect, that's a fatal error. */ + if (qmeta->dbmeta.magic != DB_QAMMAGIC) { + __db_errx(env, "__qam_open: %s: unexpected file type or format", + name); + ret = EINVAL; + goto err; + } + + /* Setup information needed to open extents. */ + t->page_ext = qmeta->page_ext; + + if (t->page_ext != 0 && (ret = __qam_set_ext_data(dbp, name)) != 0) + goto err; + + if (mode == 0) + mode = DB_MODE_660; + t->mode = mode; + t->re_pad = (int)qmeta->re_pad; + t->re_len = qmeta->re_len; + t->rec_page = qmeta->rec_page; + + t->q_meta = base_pgno; + t->q_root = base_pgno + 1; + +err: if (qmeta != NULL && (t_ret = + __memp_fput(mpf, ip, qmeta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + + /* Don't hold the meta page long term. */ + if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) + ret = t_ret; + + if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +/* + * __qam_set_ext_data -- + * Setup DBP data for opening queue extents. + * + * PUBLIC: int __qam_set_ext_data __P((DB*, const char *)); + */ +int +__qam_set_ext_data(dbp, name) + DB *dbp; + const char *name; +{ + QUEUE *t; + int ret; + + t = dbp->q_internal; + t->pginfo.db_pagesize = dbp->pgsize; + t->pginfo.flags = + F_ISSET(dbp, (DB_AM_CHKSUM | DB_AM_ENCRYPT | DB_AM_SWAP)); + t->pginfo.type = dbp->type; + t->pgcookie.data = &t->pginfo; + t->pgcookie.size = sizeof(DB_PGINFO); + + if ((ret = __os_strdup(dbp->env, name, &t->path)) != 0) + return (ret); + t->dir = t->path; + if ((t->name = __db_rpath(t->path)) == NULL) { + t->name = t->path; + t->dir = PATH_DOT; + } else + *t->name++ = '\0'; + + return (0); +} + +/* + * __qam_metachk -- + * + * PUBLIC: int __qam_metachk __P((DB *, const char *, QMETA *)); + */ +int +__qam_metachk(dbp, name, qmeta) + DB *dbp; + const char *name; + QMETA *qmeta; +{ + ENV *env; + u_int32_t vers; + int ret; + + env = dbp->env; + ret = 0; + + /* + * At this point, all we know is that the magic number is for a Queue. + * Check the version, the database may be out of date. + */ + vers = qmeta->dbmeta.version; + if (F_ISSET(dbp, DB_AM_SWAP)) + M_32_SWAP(vers); + switch (vers) { + case 1: + case 2: + __db_errx(env, + "%s: queue version %lu requires a version upgrade", + name, (u_long)vers); + return (DB_OLD_VERSION); + case 3: + case 4: + break; + default: + __db_errx(env, + "%s: unsupported qam version: %lu", name, (u_long)vers); + return (EINVAL); + } + + /* Swap the page if we need to. */ + if (F_ISSET(dbp, DB_AM_SWAP) && + (ret = __qam_mswap(env, (PAGE *)qmeta)) != 0) + return (ret); + + /* Check the type. */ + if (dbp->type != DB_QUEUE && dbp->type != DB_UNKNOWN) + return (EINVAL); + dbp->type = DB_QUEUE; + DB_ILLEGAL_METHOD(dbp, DB_OK_QUEUE); + + /* Set the page size. */ + dbp->pgsize = qmeta->dbmeta.pagesize; + + /* Copy the file's ID. */ + memcpy(dbp->fileid, qmeta->dbmeta.uid, DB_FILE_ID_LEN); + + /* Set up AM-specific methods that do not require an open. */ + dbp->db_am_rename = __qam_rename; + dbp->db_am_remove = __qam_remove; + + return (ret); +} + +/* + * __qam_init_meta -- + * Initialize the meta-data for a Queue database. + */ +static int +__qam_init_meta(dbp, meta) + DB *dbp; + QMETA *meta; +{ + ENV *env; + QUEUE *t; + + env = dbp->env; + t = dbp->q_internal; + + memset(meta, 0, sizeof(QMETA)); + LSN_NOT_LOGGED(meta->dbmeta.lsn); + meta->dbmeta.pgno = PGNO_BASE_MD; + meta->dbmeta.last_pgno = 0; + meta->dbmeta.magic = DB_QAMMAGIC; + meta->dbmeta.version = DB_QAMVERSION; + meta->dbmeta.pagesize = dbp->pgsize; + if (F_ISSET(dbp, DB_AM_CHKSUM)) + FLD_SET(meta->dbmeta.metaflags, DBMETA_CHKSUM); + if (F_ISSET(dbp, DB_AM_ENCRYPT)) { + meta->dbmeta.encrypt_alg = env->crypto_handle->alg; + DB_ASSERT(env, meta->dbmeta.encrypt_alg != 0); + meta->crypto_magic = meta->dbmeta.magic; + } + meta->dbmeta.type = P_QAMMETA; + meta->re_pad = (u_int32_t)t->re_pad; + meta->re_len = t->re_len; + meta->rec_page = CALC_QAM_RECNO_PER_PAGE(dbp); + meta->cur_recno = 1; + meta->first_recno = 1; + meta->page_ext = t->page_ext; + t->rec_page = meta->rec_page; + memcpy(meta->dbmeta.uid, dbp->fileid, DB_FILE_ID_LEN); + + /* Verify that we can fit at least one record per page. */ + if (QAM_RECNO_PER_PAGE(dbp) < 1) { + __db_errx(env, + "Record size of %lu too large for page size of %lu", + (u_long)t->re_len, (u_long)dbp->pgsize); + return (EINVAL); + } + + return (0); +} + +/* + * __qam_new_file -- + * Create the necessary pages to begin a new queue database file. + * + * PUBLIC: int __qam_new_file __P((DB *, + * PUBLIC: DB_THREAD_INFO *, DB_TXN *, DB_FH *, const char *)); + */ +int +__qam_new_file(dbp, ip, txn, fhp, name) + DB *dbp; + DB_THREAD_INFO *ip; + DB_TXN *txn; + DB_FH *fhp; + const char *name; +{ + DBT pdbt; + DB_MPOOLFILE *mpf; + DB_PGINFO pginfo; + ENV *env; + QMETA *meta; + db_pgno_t pgno; + int ret, t_ret; + + /* + * Build meta-data page. + * + * This code appears more complex than it is because of the two cases + * (named and unnamed). + * + * For each page being created, there are three parts: 1) a "get page" + * chunk (which either uses malloc'd memory or calls __memp_fget), 2) + * the initialization, and 3) the "put page" chunk which either does a + * fop write or an __memp_fput. + */ + if (F_ISSET(dbp, DB_AM_INMEM)) { + mpf = dbp->mpf; + pgno = PGNO_BASE_MD; + if ((ret = __memp_fget(mpf, &pgno, ip, txn, + DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &meta)) != 0) + return (ret); + + if ((ret = __qam_init_meta(dbp, meta)) != 0) + goto err1; + + if ((ret = __db_log_page(dbp, + txn, &meta->dbmeta.lsn, pgno, (PAGE *)meta)) != 0) + goto err1; +err1: if ((t_ret = + __memp_fput(mpf, ip, meta, dbp->priority)) != 0 && ret == 0) + ret = t_ret; + } else { + env = dbp->env; + if ((ret = __os_calloc(env, 1, dbp->pgsize, &meta)) != 0) + return (ret); + + if ((ret = __qam_init_meta(dbp, meta)) != 0) + goto err2; + + pginfo.db_pagesize = dbp->pgsize; + pginfo.flags = + F_ISSET(dbp, (DB_AM_CHKSUM | DB_AM_ENCRYPT | DB_AM_SWAP)); + pginfo.type = DB_QUEUE; + DB_SET_DBT(pdbt, &pginfo, sizeof(pginfo)); + if ((ret = + __db_pgout(env->dbenv, PGNO_BASE_MD, meta, &pdbt)) != 0) + goto err2; + ret = __fop_write(env, txn, name, dbp->dirname, + DB_APP_DATA, fhp, dbp->pgsize, 0, 0, meta, dbp->pgsize, 1, + F_ISSET(dbp, DB_AM_NOT_DURABLE) ? DB_LOG_NOT_DURABLE : 0); + +err2: __os_free(env, meta); + } + + return (ret); +} diff --git a/qam/qam_rec.c b/qam/qam_rec.c new file mode 100644 index 00000000..65ed0eac --- /dev/null +++ b/qam/qam_rec.c @@ -0,0 +1,663 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1999-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_page.h" +#include "dbinc/db_am.h" +#include "dbinc/lock.h" +#include "dbinc/log.h" +#include "dbinc/mp.h" +#include "dbinc/qam.h" +#include "dbinc/txn.h" + +/* + * LSNs in queue data pages are advisory. They do not have to be accurate + * as all operations are idempotent on records. They should not be rolled + * forward during recovery as committed transaction may obscure updates from + * an incomplete transaction that updates the same page. The incomplete + * transaction may be completed during a later hot backup cycle. + */ + +/* Queue version of REC_DIRTY -- needs to probe the correct file. */ +#define QAM_DIRTY(dbc, pgno, pagep) \ + if ((ret = __qam_dirty((dbc), \ + pgno, pagep, (dbc)->priority)) != 0) { \ + ret = __db_pgerr((dbc)->dbp, (pgno), ret); \ + goto out; \ + } + +/* + * __qam_incfirst_recover -- + * Recovery function for incfirst. + * + * PUBLIC: int __qam_incfirst_recover + * PUBLIC: __P((ENV *, DBT *, DB_LSN *, db_recops, void *)); + */ +int +__qam_incfirst_recover(env, dbtp, lsnp, op, info) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops op; + void *info; +{ + __qam_incfirst_args *argp; + DB_THREAD_INFO *ip; + DB *file_dbp; + DBC *dbc; + DB_LOCK lock; + DB_LSN trunc_lsn; + DB_MPOOLFILE *mpf; + QMETA *meta; + QUEUE_CURSOR *cp; + db_pgno_t metapg; + u_int32_t rec_ext; + int exact, ret, t_ret; + + COMPQUIET(meta, NULL); + + ip = ((DB_TXNHEAD *)info)->thread_info; + LOCK_INIT(lock); + REC_PRINT(__qam_incfirst_print); + REC_INTRO(__qam_incfirst_read, ip, 1); + + metapg = ((QUEUE *)file_dbp->q_internal)->q_meta; + + if ((ret = __db_lget(dbc, + LCK_ROLLBACK, metapg, DB_LOCK_WRITE, 0, &lock)) != 0) + goto done; + if ((ret = __memp_fget(mpf, &metapg, ip, NULL, + 0, &meta)) != 0) { + if (DB_REDO(op)) { + if ((ret = __memp_fget(mpf, &metapg, ip, NULL, + DB_MPOOL_CREATE, &meta)) != 0) { + (void)__LPUT(dbc, lock); + goto out; + } + meta->dbmeta.pgno = metapg; + meta->dbmeta.type = P_QAMMETA; + } else { + *lsnp = argp->prev_lsn; + ret = __LPUT(dbc, lock); + goto out; + } + } + + /* + * Only move first_recno backwards so we pick up the aborted delete. + * When going forward we need to be careful since + * we may have bumped over a locked record. + */ + if (DB_UNDO(op)) { + if (QAM_BEFORE_FIRST(meta, argp->recno)) { + REC_DIRTY(mpf, ip, dbc->priority, &meta); + meta->first_recno = argp->recno; + } + + trunc_lsn = ((DB_TXNHEAD *)info)->trunc_lsn; + /* if we are truncating, update the LSN */ + if (!IS_ZERO_LSN(trunc_lsn) && + LOG_COMPARE(&LSN(meta), &trunc_lsn) > 0) { + REC_DIRTY(mpf, ip, dbc->priority, &meta); + LSN(meta) = trunc_lsn; + } + } else { + if (LOG_COMPARE(&LSN(meta), lsnp) < 0) { + REC_DIRTY(mpf, ip, dbc->priority, &meta); + LSN(meta) = *lsnp; + } + if (meta->page_ext == 0) + rec_ext = 0; + else + rec_ext = meta->page_ext * meta->rec_page; + cp = (QUEUE_CURSOR *)dbc->internal; + if (meta->first_recno == RECNO_OOB) + meta->first_recno++; + while (meta->first_recno != meta->cur_recno && + !QAM_BEFORE_FIRST(meta, argp->recno + 1)) { + if ((ret = __qam_position(dbc, + &meta->first_recno, 0, &exact)) != 0) + goto err; + if (cp->page != NULL && (ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0) + goto err; + + if (exact == 1) + break; + if (cp->page != NULL && + rec_ext != 0 && meta->first_recno % rec_ext == 0) + if ((ret = + __qam_fremove(file_dbp, cp->pgno)) != 0) + goto err; + REC_DIRTY(mpf, ip, dbc->priority, &meta); + meta->first_recno++; + if (meta->first_recno == RECNO_OOB) + meta->first_recno++; + } + } + + ret = __memp_fput(mpf, ip, meta, dbc->priority); + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; + if (ret != 0) + goto out; + +done: *lsnp = argp->prev_lsn; + ret = 0; + + if (0) { +err: (void)__memp_fput(mpf, ip, meta, dbc->priority); + (void)__LPUT(dbc, lock); + } + +out: REC_CLOSE; +} + +/* + * __qam_mvptr_recover -- + * Recovery function for mvptr. + * + * PUBLIC: int __qam_mvptr_recover + * PUBLIC: __P((ENV *, DBT *, DB_LSN *, db_recops, void *)); + */ +int +__qam_mvptr_recover(env, dbtp, lsnp, op, info) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops op; + void *info; +{ + __qam_mvptr_args *argp; + DB_THREAD_INFO *ip; + DB *file_dbp; + DBC *dbc; + DB_LSN trunc_lsn; + DB_LOCK lock; + DB_MPOOLFILE *mpf; + QMETA *meta; + QUEUE_CURSOR *cp; + db_pgno_t metapg; + int cmp_n, cmp_p, exact, ret; + + ip = ((DB_TXNHEAD *)info)->thread_info; + REC_PRINT(__qam_mvptr_print); + REC_INTRO(__qam_mvptr_read, ip, 1); + + metapg = ((QUEUE *)file_dbp->q_internal)->q_meta; + + if ((ret = __db_lget(dbc, + LCK_ROLLBACK, metapg, DB_LOCK_WRITE, 0, &lock)) != 0) + goto done; + if ((ret = __memp_fget(mpf, &metapg, ip, NULL, 0, &meta)) != 0) { + if (DB_REDO(op)) { + if ((ret = __memp_fget(mpf, &metapg, ip, NULL, + DB_MPOOL_CREATE, &meta)) != 0) { + (void)__LPUT(dbc, lock); + goto out; + } + meta->dbmeta.pgno = metapg; + meta->dbmeta.type = P_QAMMETA; + } else { + *lsnp = argp->prev_lsn; + ret = __LPUT(dbc, lock); + goto out; + } + } + + cmp_n = LOG_COMPARE(lsnp, &LSN(meta)); + cmp_p = LOG_COMPARE(&LSN(meta), &argp->metalsn); + + /* + * Under normal circumstances, we never undo a movement of one of + * the pointers. Just move them along regardless of abort/commit. + * When going forward we need to verify that this is really where + * the pointer belongs. A transaction may roll back and reinsert + * a record that was missing at the time of this action. + * + * If we're undoing a truncate, we need to reset the pointers to + * their state before the truncate. + */ + if (DB_UNDO(op)) { + if ((argp->opcode & QAM_TRUNCATE) && cmp_n <= 0) { + REC_DIRTY(mpf, ip, dbc->priority, &meta); + meta->first_recno = argp->old_first; + meta->cur_recno = argp->old_cur; + LSN(meta) = argp->metalsn; + } + /* If the page lsn is beyond the truncate point, move it back */ + trunc_lsn = ((DB_TXNHEAD *)info)->trunc_lsn; + if (!IS_ZERO_LSN(trunc_lsn) && + LOG_COMPARE(&trunc_lsn, &LSN(meta)) < 0) { + REC_DIRTY(mpf, ip, dbc->priority, &meta); + LSN(meta) = argp->metalsn; + } + } else if (op == DB_TXN_APPLY || cmp_p == 0) { + REC_DIRTY(mpf, ip, dbc->priority, &meta); + cp = (QUEUE_CURSOR *)dbc->internal; + if ((argp->opcode & QAM_SETFIRST) && + meta->first_recno == argp->old_first) { + if (argp->old_first > argp->new_first) + meta->first_recno = argp->new_first; + else { + if ((ret = __qam_position(dbc, + &meta->first_recno, 0, &exact)) != 0) + goto err; + if (!exact) + meta->first_recno = argp->new_first; + if (cp->page != NULL && + (ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0) + goto err; + } + } + + if ((argp->opcode & QAM_SETCUR) && + meta->cur_recno == argp->old_cur) { + if (argp->old_cur < argp->new_cur) + meta->cur_recno = argp->new_cur; + else { + if ((ret = __qam_position(dbc, + &meta->cur_recno, 0, &exact)) != 0) + goto err; + if (!exact) + meta->cur_recno = argp->new_cur; + if (cp->page != NULL && + (ret = __qam_fput(dbc, + cp->pgno, cp->page, dbc->priority)) != 0) + goto err; + } + } + + meta->dbmeta.lsn = *lsnp; + } + + if ((ret = __memp_fput(mpf, ip, meta, dbc->priority)) != 0) + goto out; + + if ((ret = __LPUT(dbc, lock)) != 0) + goto out; + +done: *lsnp = argp->prev_lsn; + ret = 0; + + if (0) { +err: (void)__memp_fput(mpf, ip, meta, dbc->priority); + (void)__LPUT(dbc, lock); + } + +out: REC_CLOSE; +} + +/* + * __qam_del_recover -- + * Recovery function for del. + * Non-extent version or if there is no data (zero len). + * + * PUBLIC: int __qam_del_recover + * PUBLIC: __P((ENV *, DBT *, DB_LSN *, db_recops, void *)); + */ +int +__qam_del_recover(env, dbtp, lsnp, op, info) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops op; + void *info; +{ + __qam_del_args *argp; + DB_THREAD_INFO *ip; + DB *file_dbp; + DBC *dbc; + DB_LOCK lock; + DB_MPOOLFILE *mpf; + QAMDATA *qp; + QMETA *meta; + QPAGE *pagep; + db_pgno_t metapg; + int cmp_n, ret, t_ret; + + COMPQUIET(pagep, NULL); + LOCK_INIT(lock); + meta = NULL; + pagep = NULL; + + ip = ((DB_TXNHEAD *)info)->thread_info; + REC_PRINT(__qam_del_print); + REC_INTRO(__qam_del_read, ip, 1); + + /* Lock the meta page before latching the page. */ + if (DB_UNDO(op)) { + metapg = ((QUEUE *)file_dbp->q_internal)->q_meta; + if ((ret = __db_lget(dbc, + LCK_ROLLBACK, metapg, DB_LOCK_WRITE, 0, &lock)) != 0) + goto out; + if ((ret = __memp_fget(mpf, &metapg, ip, NULL, + DB_MPOOL_EDIT, &meta)) != 0) + goto err; + } + + if ((ret = __qam_fget(dbc, &argp->pgno, DB_MPOOL_CREATE, &pagep)) != 0) + goto err; + + if (pagep->pgno == PGNO_INVALID) { + QAM_DIRTY(dbc, argp->pgno, &pagep); + pagep->pgno = argp->pgno; + pagep->type = P_QAMDATA; + } + + cmp_n = LOG_COMPARE(lsnp, &LSN(pagep)); + + if (DB_UNDO(op)) { + /* make sure first is behind us */ + if (meta->first_recno == RECNO_OOB || + (QAM_BEFORE_FIRST(meta, argp->recno) && + (meta->first_recno <= meta->cur_recno || + meta->first_recno - + argp->recno < argp->recno - meta->cur_recno))) { + REC_DIRTY(mpf, ip, dbc->priority, &meta); + meta->first_recno = argp->recno; + } + + /* Need to undo delete - mark the record as present */ + QAM_DIRTY(dbc, pagep->pgno, &pagep); + qp = QAM_GET_RECORD(file_dbp, pagep, argp->indx); + F_SET(qp, QAM_VALID); + + /* + * Move the LSN back to this point; do not move it forward. + * If we're in an abort, because we don't hold a page lock, + * we could foul up a concurrent put. Having too late an + * LSN * is harmless in queue except when we're determining + * what we need to roll forward during recovery. [#2588] + */ + if (cmp_n <= 0 && op == DB_TXN_BACKWARD_ROLL) + LSN(pagep) = argp->lsn; + } else if (op == DB_TXN_APPLY || (cmp_n > 0 && DB_REDO(op))) { + /* Need to redo delete - clear the valid bit */ + QAM_DIRTY(dbc, pagep->pgno, &pagep); + qp = QAM_GET_RECORD(file_dbp, pagep, argp->indx); + F_CLR(qp, QAM_VALID); + /* + * We only move the LSN forward during replication. + * During recovery we could obscure an update from + * a partially completed transaction while processing + * a hot backup. [#13823] + */ + if (op == DB_TXN_APPLY) + LSN(pagep) = *lsnp; + } + +done: *lsnp = argp->prev_lsn; + ret = 0; + +err: if (pagep != NULL && (t_ret = + __qam_fput(dbc, argp->pgno, pagep, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + if (meta != NULL && (t_ret = + __memp_fput(mpf, ip, meta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; +out: REC_CLOSE; +} + +/* + * __qam_delext_recover -- + * Recovery function for del in an extent based queue. + * + * PUBLIC: int __qam_delext_recover + * PUBLIC: __P((ENV *, DBT *, DB_LSN *, db_recops, void *)); + */ +int +__qam_delext_recover(env, dbtp, lsnp, op, info) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops op; + void *info; +{ + __qam_delext_args *argp; + DB_THREAD_INFO *ip; + DB *file_dbp; + DBC *dbc; + DB_LOCK lock; + DB_MPOOLFILE *mpf; + QAMDATA *qp; + QMETA *meta; + QPAGE *pagep; + db_pgno_t metapg; + int cmp_n, ret, t_ret; + + COMPQUIET(pagep, NULL); + LOCK_INIT(lock); + meta = NULL; + pagep = NULL; + + ip = ((DB_TXNHEAD *)info)->thread_info; + REC_PRINT(__qam_delext_print); + REC_INTRO(__qam_delext_read, ip, 1); + + if (DB_UNDO(op)) { + metapg = ((QUEUE *)file_dbp->q_internal)->q_meta; + if ((ret = __db_lget(dbc, + LCK_ROLLBACK, metapg, DB_LOCK_WRITE, 0, &lock)) != 0) + goto err; + if ((ret = __memp_fget(mpf, &metapg, ip, NULL, + DB_MPOOL_EDIT, &meta)) != 0) { + (void)__LPUT(dbc, lock); + goto err; + } + } + + if ((ret = __qam_fget(dbc, &argp->pgno, + DB_REDO(op) ? 0 : DB_MPOOL_CREATE, &pagep)) != 0) { + /* + * If we are redoing a delete and the page is not there + * we are done. + */ + if (DB_REDO(op) && (ret == DB_PAGE_NOTFOUND || ret == ENOENT)) + goto done; + goto out; + } + + if (pagep->pgno == PGNO_INVALID) { + QAM_DIRTY(dbc, argp->pgno, &pagep); + pagep->pgno = argp->pgno; + pagep->type = P_QAMDATA; + } + + cmp_n = LOG_COMPARE(lsnp, &LSN(pagep)); + + if (DB_UNDO(op)) { + /* make sure first is behind us */ + if (meta->first_recno == RECNO_OOB || + (QAM_BEFORE_FIRST(meta, argp->recno) && + (meta->first_recno <= meta->cur_recno || + meta->first_recno - + argp->recno < argp->recno - meta->cur_recno))) { + meta->first_recno = argp->recno; + } + + QAM_DIRTY(dbc, pagep->pgno, &pagep); + if ((ret = __qam_pitem(dbc, pagep, + argp->indx, argp->recno, &argp->data)) != 0) + goto err; + + /* + * Move the LSN back to this point; do not move it forward. + * If we're in an abort, because we don't hold a page lock, + * we could foul up a concurrent put. Having too late an + * LSN is harmless in queue except when we're determining + * what we need to roll forward during recovery. [#2588] + */ + if (cmp_n <= 0 && op == DB_TXN_BACKWARD_ROLL) + LSN(pagep) = argp->lsn; + } else if (op == DB_TXN_APPLY || (cmp_n > 0 && DB_REDO(op))) { + QAM_DIRTY(dbc, pagep->pgno, &pagep); + /* Need to redo delete - clear the valid bit */ + qp = QAM_GET_RECORD(file_dbp, pagep, argp->indx); + F_CLR(qp, QAM_VALID); + /* + * We only move the LSN forward during replication. + * During recovery we could obscure an update from + * a partially completed transaction while processing + * a hot backup. [#13823] + */ + if (op == DB_TXN_APPLY) + LSN(pagep) = *lsnp; + } + +done: *lsnp = argp->prev_lsn; + ret = 0; + +err: if (pagep != NULL && (t_ret = + __qam_fput(dbc, argp->pgno, pagep, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + if (meta != NULL && (t_ret = + __memp_fput(mpf, ip, meta, dbc->priority)) != 0 && ret == 0) + ret = t_ret; + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; + +out: REC_CLOSE; +} + +/* + * __qam_add_recover -- + * Recovery function for add. + * + * PUBLIC: int __qam_add_recover + * PUBLIC: __P((ENV *, DBT *, DB_LSN *, db_recops, void *)); + */ +int +__qam_add_recover(env, dbtp, lsnp, op, info) + ENV *env; + DBT *dbtp; + DB_LSN *lsnp; + db_recops op; + void *info; +{ + __qam_add_args *argp; + DB_THREAD_INFO *ip; + DB *file_dbp; + DBC *dbc; + DB_MPOOLFILE *mpf; + QAMDATA *qp; + QMETA *meta; + QPAGE *pagep; + db_pgno_t metapg; + int cmp_n, ret; + + COMPQUIET(pagep, NULL); + + ip = ((DB_TXNHEAD *)info)->thread_info; + REC_PRINT(__qam_add_print); + REC_INTRO(__qam_add_read, ip, 1); + + if ((ret = __qam_fget(dbc, &argp->pgno, + DB_UNDO(op) ? 0 : DB_MPOOL_CREATE, &pagep)) != 0) { + /* + * If we are undoing an append and the page is not there + * we are done. + */ + if (DB_UNDO(op) && (ret == DB_PAGE_NOTFOUND || ret == ENOENT)) + goto done; + goto out; + } + + if (pagep->pgno == PGNO_INVALID) { + QAM_DIRTY(dbc, argp->pgno, &pagep); + pagep->pgno = argp->pgno; + pagep->type = P_QAMDATA; + } + + cmp_n = LOG_COMPARE(lsnp, &LSN(pagep)); + + if (DB_REDO(op)) { + /* Fix meta-data page. */ + metapg = ((QUEUE *)file_dbp->q_internal)->q_meta; + if ((ret = __memp_fget(mpf, &metapg, ip, NULL, + 0, &meta)) != 0) + goto err; + if (QAM_BEFORE_FIRST(meta, argp->recno)) { + REC_DIRTY(mpf, ip, dbc->priority, &meta); + meta->first_recno = argp->recno; + } + if (argp->recno == meta->cur_recno || + QAM_AFTER_CURRENT(meta, argp->recno)) { + REC_DIRTY(mpf, ip, dbc->priority, &meta); + meta->cur_recno = argp->recno + 1; + } + if ((ret = __memp_fput(mpf, ip, meta, dbc->priority)) != 0) + goto err; + + /* Now update the actual page if necessary. */ + if (op == DB_TXN_APPLY || cmp_n > 0) { + QAM_DIRTY(dbc, pagep->pgno, &pagep); + /* Need to redo add - put the record on page */ + if ((ret = __qam_pitem(dbc, + pagep, argp->indx, argp->recno, &argp->data)) != 0) + goto err; + /* + * We only move the LSN forward during replication. + * During recovery we could obscure an update from + * a partially completed transaction while processing + * a hot backup. [#13823] + */ + if (op == DB_TXN_APPLY) + LSN(pagep) = *lsnp; + } + } else if (DB_UNDO(op)) { + /* + * Need to undo add + * If this was an overwrite, put old record back. + * Otherwise just clear the valid bit + */ + if (argp->olddata.size != 0) { + QAM_DIRTY(dbc, pagep->pgno, &pagep); + if ((ret = __qam_pitem(dbc, pagep, + argp->indx, argp->recno, &argp->olddata)) != 0) + goto err; + + if (!(argp->vflag & QAM_VALID)) { + qp = QAM_GET_RECORD( + file_dbp, pagep, argp->indx); + F_CLR(qp, QAM_VALID); + } + } else { + QAM_DIRTY(dbc, pagep->pgno, &pagep); + qp = QAM_GET_RECORD(file_dbp, pagep, argp->indx); + qp->flags = 0; + } + + /* + * Move the LSN back to this point; do not move it forward. + * If we're in an abort, because we don't hold a page lock, + * we could foul up a concurrent put. Having too late an + * LSN is harmless in queue except when we're determining + * what we need to roll forward during recovery. [#2588] + */ + if (cmp_n <= 0 && op == DB_TXN_BACKWARD_ROLL) + LSN(pagep) = argp->lsn; + } + + if ((ret = __qam_fput(dbc, argp->pgno, pagep, dbc->priority)) != 0) + goto out; + +done: *lsnp = argp->prev_lsn; + ret = 0; + + if (0) { +err: (void)__qam_fput(dbc, argp->pgno, pagep, dbc->priority); + } + +out: REC_CLOSE; +} diff --git a/qam/qam_stat.c b/qam/qam_stat.c new file mode 100644 index 00000000..15627caf --- /dev/null +++ b/qam/qam_stat.c @@ -0,0 +1,253 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1999-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_page.h" +#include "dbinc/db_am.h" +#include "dbinc/lock.h" +#include "dbinc/mp.h" +#include "dbinc/qam.h" + +#ifdef HAVE_STATISTICS +/* + * __qam_stat -- + * Gather/print the qam statistics + * + * PUBLIC: int __qam_stat __P((DBC *, void *, u_int32_t)); + */ +int +__qam_stat(dbc, spp, flags) + DBC *dbc; + void *spp; + u_int32_t flags; +{ + DB *dbp; + DB_LOCK lock; + DB_MPOOLFILE *mpf; + DB_QUEUE_STAT *sp; + PAGE *h; + QAMDATA *qp, *ep; + QMETA *meta; + QUEUE *t; + db_indx_t indx; + db_pgno_t first, last, pgno, pg_ext, stop; + u_int32_t re_len; + int ret, t_ret; + + dbp = dbc->dbp; + + LOCK_INIT(lock); + mpf = dbp->mpf; + sp = NULL; + t = dbp->q_internal; + + if (spp == NULL) + return (0); + + /* Allocate and clear the structure. */ + if ((ret = __os_umalloc(dbp->env, sizeof(*sp), &sp)) != 0) + goto err; + memset(sp, 0, sizeof(*sp)); + + re_len = ((QUEUE *)dbp->q_internal)->re_len; + + /* Determine the last page of the database. */ + if ((ret = __db_lget(dbc, 0, t->q_meta, DB_LOCK_READ, 0, &lock)) != 0) + goto err; + if ((ret = __memp_fget(mpf, &t->q_meta, + dbc->thread_info, dbc->txn, 0, &meta)) != 0) + goto err; + + if (flags == DB_FAST_STAT) { + sp->qs_nkeys = meta->dbmeta.key_count; + sp->qs_ndata = meta->dbmeta.record_count; + goto meta_only; + } + + first = QAM_RECNO_PAGE(dbp, meta->first_recno); + last = QAM_RECNO_PAGE(dbp, meta->cur_recno); + + ret = __memp_fput(mpf, dbc->thread_info, meta, dbc->priority); + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; + if (ret != 0) + goto err; + + pgno = first; + if (first > last) + stop = QAM_RECNO_PAGE(dbp, UINT32_MAX); + else + stop = last; + + /* Dump each page. */ + pg_ext = ((QUEUE *)dbp->q_internal)->page_ext; +begin: + /* Walk through the pages and count. */ + for (; pgno <= stop; ++pgno) { + if ((ret = + __db_lget(dbc, 0, pgno, DB_LOCK_READ, 0, &lock)) != 0) + goto err; + ret = __qam_fget(dbc, &pgno, 0, &h); + if (ret == ENOENT) { + pgno += pg_ext - 1; + continue; + } + if (ret == DB_PAGE_NOTFOUND) { + if (pg_ext == 0) { + if (pgno != stop && first != last) + goto err; + ret = 0; + break; + } + pgno += (pg_ext - ((pgno - 1) % pg_ext)) - 1; + continue; + } + if (ret != 0) + goto err; + + ++sp->qs_pages; + + ep = (QAMDATA *)((u_int8_t *)h + dbp->pgsize - re_len); + for (indx = 0, qp = QAM_GET_RECORD(dbp, h, indx); + qp <= ep; + ++indx, qp = QAM_GET_RECORD(dbp, h, indx)) { + if (F_ISSET(qp, QAM_VALID)) + sp->qs_ndata++; + else + sp->qs_pgfree += re_len; + } + + ret = __qam_fput(dbc, pgno, h, dbc->priority); + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; + if (ret != 0) + goto err; + } + + if ((ret = __LPUT(dbc, lock)) != 0) + goto err; + if (first > last) { + pgno = 1; + stop = last; + first = last; + goto begin; + } + + /* Get the meta-data page. */ + if ((ret = __db_lget(dbc, + 0, t->q_meta, F_ISSET(dbp, DB_AM_RDONLY) ? + DB_LOCK_READ : DB_LOCK_WRITE, 0, &lock)) != 0) + goto err; + if ((ret = __memp_fget(mpf, &t->q_meta, dbc->thread_info, dbc->txn, + F_ISSET(dbp, DB_AM_RDONLY) ? 0 : DB_MPOOL_DIRTY, &meta)) != 0) + goto err; + + if (!F_ISSET(dbp, DB_AM_RDONLY)) + meta->dbmeta.key_count = + meta->dbmeta.record_count = sp->qs_ndata; + sp->qs_nkeys = sp->qs_ndata; + +meta_only: + /* Get the metadata fields. */ + sp->qs_magic = meta->dbmeta.magic; + sp->qs_version = meta->dbmeta.version; + sp->qs_metaflags = meta->dbmeta.flags; + sp->qs_pagesize = meta->dbmeta.pagesize; + sp->qs_extentsize = meta->page_ext; + sp->qs_re_len = meta->re_len; + sp->qs_re_pad = meta->re_pad; + sp->qs_first_recno = meta->first_recno; + sp->qs_cur_recno = meta->cur_recno; + + /* Discard the meta-data page. */ + ret = __memp_fput(mpf, dbc->thread_info, meta, dbc->priority); + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; + if (ret != 0) + goto err; + + *(DB_QUEUE_STAT **)spp = sp; + + if (0) { +err: if (sp != NULL) + __os_ufree(dbp->env, sp); + } + + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +/* + * __qam_stat_print -- + * Display queue statistics. + * + * PUBLIC: int __qam_stat_print __P((DBC *, u_int32_t)); + */ +int +__qam_stat_print(dbc, flags) + DBC *dbc; + u_int32_t flags; +{ + DB *dbp; + DB_QUEUE_STAT *sp; + ENV *env; + int ret; + + dbp = dbc->dbp; + env = dbp->env; + + if ((ret = __qam_stat(dbc, &sp, LF_ISSET(DB_FAST_STAT))) != 0) + return (ret); + + if (LF_ISSET(DB_STAT_ALL)) { + __db_msg(env, "%s", DB_GLOBAL(db_line)); + __db_msg(env, "Default Queue database information:"); + } + __db_msg(env, "%lx\tQueue magic number", (u_long)sp->qs_magic); + __db_msg(env, "%lu\tQueue version number", (u_long)sp->qs_version); + __db_dl(env, "Fixed-length record size", (u_long)sp->qs_re_len); + __db_msg(env, "%#x\tFixed-length record pad", (int)sp->qs_re_pad); + __db_dl(env, + "Underlying database page size", (u_long)sp->qs_pagesize); + __db_dl(env, + "Underlying database extent size", (u_long)sp->qs_extentsize); + __db_dl(env, + "Number of records in the database", (u_long)sp->qs_nkeys); + __db_dl(env, "Number of database pages", (u_long)sp->qs_pages); + __db_dl_pct(env, + "Number of bytes free in database pages", + (u_long)sp->qs_pgfree, + DB_PCT_PG(sp->qs_pgfree, sp->qs_pages, sp->qs_pagesize), "ff"); + __db_msg(env, + "%lu\tFirst undeleted record", (u_long)sp->qs_first_recno); + __db_msg(env, + "%lu\tNext available record number", (u_long)sp->qs_cur_recno); + + __os_ufree(env, sp); + + return (0); +} + +#else /* !HAVE_STATISTICS */ + +int +__qam_stat(dbc, spp, flags) + DBC *dbc; + void *spp; + u_int32_t flags; +{ + COMPQUIET(spp, NULL); + COMPQUIET(flags, 0); + + return (__db_stat_not_built(dbc->env)); +} +#endif diff --git a/qam/qam_stub.c b/qam/qam_stub.c new file mode 100644 index 00000000..5e211fb4 --- /dev/null +++ b/qam/qam_stub.c @@ -0,0 +1,340 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1996-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#ifndef HAVE_QUEUE +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_page.h" +#include "dbinc/qam.h" + +/* + * If the library wasn't compiled with the Queue access method, various + * routines aren't available. Stub them here, returning an appropriate + * error. + */ + +/* + * __db_no_queue_am -- + * Error when a Berkeley DB build doesn't include the access method. + * + * PUBLIC: int __db_no_queue_am __P((ENV *)); + */ +int +__db_no_queue_am(env) + ENV *env; +{ + __db_errx(env, + "library build did not include support for the Queue access method"); + return (DB_OPNOTSUP); +} + +int +__db_prqueue(dbp, flags) + DB *dbp; + u_int32_t flags; +{ + COMPQUIET(flags, 0); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_31_qammeta(dbp, real_name, buf) + DB *dbp; + char *real_name; + u_int8_t *buf; +{ + COMPQUIET(real_name, NULL); + COMPQUIET(buf, NULL); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_32_qammeta(dbp, real_name, buf) + DB *dbp; + char *real_name; + u_int8_t *buf; +{ + COMPQUIET(real_name, NULL); + COMPQUIET(buf, NULL); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_append(dbc, key, data) + DBC *dbc; + DBT *key, *data; +{ + COMPQUIET(key, NULL); + COMPQUIET(data, NULL); + return (__db_no_queue_am(dbc->env)); +} + +int +__qamc_dup(orig_dbc, new_dbc) + DBC *orig_dbc, *new_dbc; +{ + COMPQUIET(new_dbc, NULL); + return (__db_no_queue_am(orig_dbc->env)); +} + +int +__qamc_init(dbc) + DBC *dbc; +{ + return (__db_no_queue_am(dbc->env)); +} + +int +__qam_db_close(dbp, flags) + DB *dbp; + u_int32_t flags; +{ + COMPQUIET(dbp, NULL); + COMPQUIET(flags, 0); + return (0); +} + +int +__qam_db_create(dbp) + DB *dbp; +{ + COMPQUIET(dbp, NULL); + return (0); +} + +int +__qam_extent_names(env, name, namelistp) + ENV *env; + char *name; + char ***namelistp; +{ + COMPQUIET(name, NULL); + COMPQUIET(namelistp, NULL); + return (__db_no_queue_am(env)); +} + +int +__qam_gen_filelist(dbp, ip, filelistp) + DB *dbp; + DB_THREAD_INFO *ip; + QUEUE_FILELIST **filelistp; +{ + COMPQUIET(ip, NULL); + COMPQUIET(filelistp, NULL); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_init_print(env, dtabp) + ENV *env; + DB_DISTAB *dtabp; +{ + COMPQUIET(env, NULL); + COMPQUIET(dtabp, NULL); + return (0); +} + +int +__qam_init_recover(env, dtabp) + ENV *env; + DB_DISTAB *dtabp; +{ + COMPQUIET(env, NULL); + COMPQUIET(dtabp, NULL); + return (0); +} + +int +__qam_metachk(dbp, name, qmeta) + DB *dbp; + const char *name; + QMETA *qmeta; +{ + COMPQUIET(name, NULL); + COMPQUIET(qmeta, NULL); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_mswap(env, pg) + ENV *env; + PAGE *pg; +{ + COMPQUIET(pg, NULL); + return (__db_no_queue_am(env)); +} + +int +__qam_new_file(dbp, ip, txn, fhp, name) + DB *dbp; + DB_THREAD_INFO *ip; + DB_TXN *txn; + DB_FH *fhp; + const char *name; +{ + COMPQUIET(ip, NULL); + COMPQUIET(txn, NULL); + COMPQUIET(fhp, NULL); + COMPQUIET(name, NULL); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_open(dbp, ip, txn, name, base_pgno, mode, flags) + DB *dbp; + DB_THREAD_INFO *ip; + DB_TXN *txn; + const char *name; + db_pgno_t base_pgno; + int mode; + u_int32_t flags; +{ + COMPQUIET(ip, NULL); + COMPQUIET(txn, NULL); + COMPQUIET(name, NULL); + COMPQUIET(base_pgno, 0); + COMPQUIET(mode, 0); + COMPQUIET(flags, 0); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_pgin_out(env, pg, pp, cookie) + ENV *env; + db_pgno_t pg; + void *pp; + DBT *cookie; +{ + COMPQUIET(pg, 0); + COMPQUIET(pp, NULL); + COMPQUIET(cookie, NULL); + return (__db_no_queue_am(env)); +} + +int +__qam_salvage(dbp, vdp, pgno, h, handle, callback, flags) + DB *dbp; + VRFY_DBINFO *vdp; + db_pgno_t pgno; + PAGE *h; + void *handle; + int (*callback) __P((void *, const void *)); + u_int32_t flags; +{ + COMPQUIET(vdp, NULL); + COMPQUIET(pgno, 0); + COMPQUIET(h, NULL); + COMPQUIET(handle, NULL); + COMPQUIET(callback, NULL); + COMPQUIET(flags, 0); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_set_ext_data(dbp, name) + DB *dbp; + const char *name; +{ + COMPQUIET(name, NULL); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_stat(dbc, spp, flags) + DBC *dbc; + void *spp; + u_int32_t flags; +{ + COMPQUIET(spp, NULL); + COMPQUIET(flags, 0); + return (__db_no_queue_am(dbc->env)); +} + +int +__qam_stat_print(dbc, flags) + DBC *dbc; + u_int32_t flags; +{ + COMPQUIET(flags, 0); + return (__db_no_queue_am(dbc->env)); +} + +int +__qam_sync(dbp) + DB *dbp; +{ + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_truncate(dbc, countp) + DBC *dbc; + u_int32_t *countp; +{ + COMPQUIET(dbc, NULL); + COMPQUIET(countp, NULL); + return (__db_no_queue_am(dbc->env)); +} + +int +__qam_vrfy_data(dbp, vdp, h, pgno, flags) + DB *dbp; + VRFY_DBINFO *vdp; + QPAGE *h; + db_pgno_t pgno; + u_int32_t flags; +{ + COMPQUIET(vdp, NULL); + COMPQUIET(h, NULL); + COMPQUIET(pgno, 0); + COMPQUIET(flags, 0); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_vrfy_meta(dbp, vdp, meta, pgno, flags) + DB *dbp; + VRFY_DBINFO *vdp; + QMETA *meta; + db_pgno_t pgno; + u_int32_t flags; +{ + COMPQUIET(vdp, NULL); + COMPQUIET(meta, NULL); + COMPQUIET(pgno, 0); + COMPQUIET(flags, 0); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_vrfy_structure(dbp, vdp, flags) + DB *dbp; + VRFY_DBINFO *vdp; + u_int32_t flags; +{ + COMPQUIET(vdp, NULL); + COMPQUIET(flags, 0); + return (__db_no_queue_am(dbp->env)); +} + +int +__qam_vrfy_walkqueue(dbp, vdp, handle, callback, flags) + DB *dbp; + VRFY_DBINFO *vdp; + void *handle; + int (*callback) __P((void *, const void *)); + u_int32_t flags; +{ + COMPQUIET(vdp, NULL); + COMPQUIET(handle, NULL); + COMPQUIET(callback, NULL); + COMPQUIET(flags, 0); + return (__db_no_queue_am(dbp->env)); +} +#endif /* !HAVE_QUEUE */ diff --git a/qam/qam_upgrade.c b/qam/qam_upgrade.c new file mode 100644 index 00000000..d872f53c --- /dev/null +++ b/qam/qam_upgrade.c @@ -0,0 +1,101 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1996-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_upgrade.h" +#include "dbinc/db_page.h" +#include "dbinc/qam.h" + +/* + * __qam_31_qammeta -- + * Upgrade the database from version 1 to version 2. + * + * PUBLIC: int __qam_31_qammeta __P((DB *, char *, u_int8_t *)); + */ +int +__qam_31_qammeta(dbp, real_name, buf) + DB *dbp; + char *real_name; + u_int8_t *buf; +{ + QMETA30 *oldmeta; + QMETA31 *newmeta; + + COMPQUIET(dbp, NULL); + COMPQUIET(real_name, NULL); + + newmeta = (QMETA31 *)buf; + oldmeta = (QMETA30 *)buf; + + /* + * Copy the fields to their new locations. + * They may overlap so start at the bottom and use memmove(). + */ + newmeta->rec_page = oldmeta->rec_page; + newmeta->re_pad = oldmeta->re_pad; + newmeta->re_len = oldmeta->re_len; + newmeta->cur_recno = oldmeta->cur_recno; + newmeta->first_recno = oldmeta->first_recno; + newmeta->start = oldmeta->start; + memmove(newmeta->dbmeta.uid, + oldmeta->dbmeta.uid, sizeof(oldmeta->dbmeta.uid)); + newmeta->dbmeta.flags = oldmeta->dbmeta.flags; + newmeta->dbmeta.record_count = 0; + newmeta->dbmeta.key_count = 0; + ZERO_LSN(newmeta->dbmeta.unused3); + + /* Update the version. */ + newmeta->dbmeta.version = 2; + + return (0); +} + +/* + * __qam_32_qammeta -- + * Upgrade the database from version 2 to version 3. + * + * PUBLIC: int __qam_32_qammeta __P((DB *, char *, u_int8_t *)); + */ +int +__qam_32_qammeta(dbp, real_name, buf) + DB *dbp; + char *real_name; + u_int8_t *buf; +{ + QMETA31 *oldmeta; + QMETA32 *newmeta; + + COMPQUIET(dbp, NULL); + COMPQUIET(real_name, NULL); + + newmeta = (QMETA32 *)buf; + oldmeta = (QMETA31 *)buf; + + /* + * Copy the fields to their new locations. + * We are dropping the first field so move + * from the top. + */ + newmeta->first_recno = oldmeta->first_recno; + newmeta->cur_recno = oldmeta->cur_recno; + newmeta->re_len = oldmeta->re_len; + newmeta->re_pad = oldmeta->re_pad; + newmeta->rec_page = oldmeta->rec_page; + newmeta->page_ext = 0; + /* cur_recno now points to the first free slot. */ + newmeta->cur_recno++; + if (newmeta->first_recno == 0) + newmeta->first_recno = 1; + + /* Update the version. */ + newmeta->dbmeta.version = 3; + + return (0); +} diff --git a/qam/qam_verify.c b/qam/qam_verify.c new file mode 100644 index 00000000..df6040a0 --- /dev/null +++ b/qam/qam_verify.c @@ -0,0 +1,633 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1999-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_page.h" +#include "dbinc/db_verify.h" +#include "dbinc/db_am.h" +#include "dbinc/mp.h" +#include "dbinc/qam.h" +/* + * __qam_vrfy_meta -- + * Verify the queue-specific part of a metadata page. + * + * PUBLIC: int __qam_vrfy_meta __P((DB *, VRFY_DBINFO *, QMETA *, + * PUBLIC: db_pgno_t, u_int32_t)); + */ +int +__qam_vrfy_meta(dbp, vdp, meta, pgno, flags) + DB *dbp; + VRFY_DBINFO *vdp; + QMETA *meta; + db_pgno_t pgno; + u_int32_t flags; +{ + ENV *env; + QUEUE *qp; + VRFY_PAGEINFO *pip; + db_pgno_t *extents, extid, first, last; + size_t len; + int count, i, isbad, nextents, ret, t_ret; + char *buf, **names; + + COMPQUIET(count, 0); + + env = dbp->env; + qp = (QUEUE *)dbp->q_internal; + extents = NULL; + first = last = 0; + isbad = 0; + buf = NULL; + names = NULL; + + if ((ret = __db_vrfy_getpageinfo(vdp, pgno, &pip)) != 0) + return (ret); + + /* + * Queue can't be used in subdatabases, so if this isn't set + * something very odd is going on. + */ + if (!F_ISSET(pip, VRFY_INCOMPLETE)) + EPRINT((env, "Page %lu: queue databases must be one-per-file", + (u_long)pgno)); + + /* + * Because the metapage pointers are rolled forward by + * aborting transactions, the extent of the queue may + * extend beyond the allocated pages, so we do + * not check that meta_current is within the allocated + * pages. + */ + + /* + * re_len: If this is bad, we can't safely verify queue data pages, so + * return DB_VERIFY_FATAL + */ + if (DB_ALIGN(meta->re_len + sizeof(QAMDATA) - 1, sizeof(u_int32_t)) * + meta->rec_page + QPAGE_SZ(dbp) > dbp->pgsize) { + EPRINT((env, + "Page %lu: queue record length %lu too high for page size and recs/page", + (u_long)pgno, (u_long)meta->re_len)); + ret = DB_VERIFY_FATAL; + goto err; + } else { + /* + * We initialize the Queue internal pointer; we may need + * it when handling extents. It would get set up in open, + * if we called open normally, but we don't. + */ + vdp->re_pad = meta->re_pad; + qp->re_pad = (int)meta->re_pad; + qp->re_len = vdp->re_len = meta->re_len; + qp->rec_page = vdp->rec_page = meta->rec_page; + qp->page_ext = vdp->page_ext = meta->page_ext; + } + + /* + * There's no formal maximum extentsize, and a 0 value represents + * no extents, so there's nothing to verify. + * + * Note that since QUEUE databases can't have subdatabases, it's an + * error to see more than one QUEUE metadata page in a single + * verifier run. Theoretically, this should really be a structure + * rather than a per-page check, but since we're setting qp fields + * here (and have only one qp to set) we raise the alarm now if + * this assumption fails. (We need the qp info to be reasonable + * before we do per-page verification of queue extents.) + */ + if (F_ISSET(vdp, VRFY_QMETA_SET)) { + isbad = 1; + EPRINT((env, + "Page %lu: database contains multiple Queue metadata pages", + (u_long)pgno)); + goto err; + } + F_SET(vdp, VRFY_QMETA_SET); + qp->page_ext = meta->page_ext; + dbp->pgsize = meta->dbmeta.pagesize; + qp->q_meta = pgno; + qp->q_root = pgno + 1; + vdp->first_recno = meta->first_recno; + vdp->last_recno = meta->cur_recno; + if (qp->page_ext != 0) { + first = QAM_RECNO_EXTENT(dbp, vdp->first_recno); + last = QAM_RECNO_EXTENT(dbp, vdp->last_recno); + } + + /* + * Look in the data directory to see if there are any extents + * around that are not in the range of the queue. If so, + * then report that and look there if we are salvaging. + */ + + if ((ret = __db_appname(env, + DB_APP_DATA, qp->dir, NULL, &buf)) != 0) + goto err; + if ((ret = __os_dirlist(env, buf, 0, &names, &count)) != 0) + goto err; + __os_free(env, buf); + buf = NULL; + + len = strlen(QUEUE_EXTENT_HEAD) + strlen(qp->name) + 1; + if ((ret = __os_malloc(env, len, &buf)) != 0) + goto err; + len = (size_t)snprintf(buf, len, QUEUE_EXTENT_HEAD, qp->name); + for (i = nextents = 0; i < count; i++) { + if (strncmp(names[i], buf, len) == 0) { + /* Only save extents out of bounds. */ + extid = (db_pgno_t)strtoul(&names[i][len], NULL, 10); + if (qp->page_ext != 0 && + (last > first ? + (extid >= first && extid <= last) : + (extid >= first || extid <= last))) + continue; + if (extents == NULL && (ret = __os_malloc( + env, (size_t)(count - i) * sizeof(extid), + &extents)) != 0) + goto err; + extents[nextents] = extid; + nextents++; + } + } + if (nextents > 0) + __db_errx(env, + "Warning: %d extra extent files found", nextents); + vdp->nextents = nextents; + vdp->extents = extents; + +err: if ((t_ret = __db_vrfy_putpageinfo(env, vdp, pip)) != 0 && ret == 0) + ret = t_ret; + if (names != NULL) + __os_dirfree(env, names, count); + if (buf != NULL) + __os_free(env, buf); + if (ret != 0 && extents != NULL) + __os_free(env, extents); + if (LF_ISSET(DB_SALVAGE) && + (t_ret = __db_salvage_markdone(vdp, pgno)) != 0 && ret == 0) + ret = t_ret; + return (ret == 0 && isbad == 1 ? DB_VERIFY_BAD : ret); +} + +/* + * __qam_meta2pgset -- + * For a given Queue meta page, add all of the db's pages to the pgset. Dealing + * with extents complicates things, as it is possible for there to be gaps in + * the page number sequence (the user could have re-inserted record numbers that + * had been on deleted extents) so we test the existence of each extent before + * adding its pages to the pgset. If there are no extents, just loop from + * first_recno to last_recno. + * + * PUBLIC: int __qam_meta2pgset __P((DB *, VRFY_DBINFO *, DB *)); + */ +int +__qam_meta2pgset(dbp, vdp, pgset) + DB *dbp; + VRFY_DBINFO *vdp; + DB *pgset; +{ + DBC *dbc; + PAGE *h; + db_pgno_t first, last, pgno, pg_ext, stop; + int ret, t_ret; + u_int32_t i; + + ret = 0; + h = NULL; + if (vdp->last_recno <= vdp->first_recno) + return 0; + + pg_ext = vdp->page_ext; + + first = QAM_RECNO_PAGE(dbp, vdp->first_recno); + + /* + * last_recno gives the next recno to be allocated, we want the last + * allocated recno. + */ + last = QAM_RECNO_PAGE(dbp, vdp->last_recno - 1); + + if (first == PGNO_INVALID || last == PGNO_INVALID) + return (DB_VERIFY_BAD); + + pgno = first; + if (first > last) + stop = QAM_RECNO_PAGE(dbp, UINT32_MAX); + else + stop = last; + + /* + * If this db doesn't have extents, just add all page numbers from first + * to last. + */ + if (pg_ext == 0) { + for (pgno = first; pgno <= stop; pgno++) + if ((ret = __db_vrfy_pgset_inc( + pgset, vdp->thread_info, pgno)) != 0) + break; + if (first > last) + for (pgno = 1; pgno <= last; pgno++) + if ((ret = __db_vrfy_pgset_inc( + pgset, vdp->thread_info, pgno)) != 0) + break; + + return ret; + } + + if ((ret = __db_cursor(dbp, vdp->thread_info, NULL, &dbc, 0)) != 0) + return (ret); + /* + * Check if we can get the first page of each extent. If we can, then + * add all of that extent's pages to the pgset. If we can't, assume the + * extent doesn't exist and don't add any pages, if we're wrong we'll + * find the pages in __db_vrfy_walkpages. + */ +begin: for (; pgno <= stop; pgno += pg_ext) { + if ((ret = __qam_fget(dbc, &pgno, 0, &h)) != 0) { + if (ret == ENOENT || ret == DB_PAGE_NOTFOUND) { + ret = 0; + continue; + } + goto err; + } + if ((ret = __qam_fput(dbc, pgno, h, dbp->priority)) != 0) + goto err; + + for (i = 0; i < pg_ext && pgno + i <= last; i++) + if ((ret = __db_vrfy_pgset_inc( + pgset, vdp->thread_info, pgno + i)) != 0) + goto err; + + /* The first recno won't always occur on the first page of the + * extent. Back up to the beginning of the extent before the + * end of the loop so that the increment works correctly. + */ + if (pgno == first) + pgno = pgno % pg_ext + 1; + } + + if (first > last) { + pgno = 1; + first = last; + stop = last; + goto begin; + } + +err: + if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) + ret = t_ret; + + return ret; +} + +/* + * __qam_vrfy_data -- + * Verify a queue data page. + * + * PUBLIC: int __qam_vrfy_data __P((DB *, VRFY_DBINFO *, QPAGE *, + * PUBLIC: db_pgno_t, u_int32_t)); + */ +int +__qam_vrfy_data(dbp, vdp, h, pgno, flags) + DB *dbp; + VRFY_DBINFO *vdp; + QPAGE *h; + db_pgno_t pgno; + u_int32_t flags; +{ + DB fakedb; + struct __queue fakeq; + QAMDATA *qp; + db_recno_t i; + + /* + * Not much to do here, except make sure that flags are reasonable. + * + * QAM_GET_RECORD assumes a properly initialized q_internal + * structure, however, and we don't have one, so we play + * some gross games to fake it out. + */ + fakedb.q_internal = &fakeq; + fakedb.flags = dbp->flags; + fakeq.re_len = vdp->re_len; + + for (i = 0; i < vdp->rec_page; i++) { + qp = QAM_GET_RECORD(&fakedb, h, i); + if ((u_int8_t *)qp >= (u_int8_t *)h + dbp->pgsize) { + EPRINT((dbp->env, + "Page %lu: queue record %lu extends past end of page", + (u_long)pgno, (u_long)i)); + return (DB_VERIFY_BAD); + } + + if (qp->flags & ~(QAM_VALID | QAM_SET)) { + EPRINT((dbp->env, + "Page %lu: queue record %lu has bad flags (%#lx)", + (u_long)pgno, (u_long)i, (u_long)qp->flags)); + return (DB_VERIFY_BAD); + } + } + + return (0); +} + +/* + * __qam_vrfy_structure -- + * Verify a queue database structure, such as it is. + * + * PUBLIC: int __qam_vrfy_structure __P((DB *, VRFY_DBINFO *, u_int32_t)); + */ +int +__qam_vrfy_structure(dbp, vdp, flags) + DB *dbp; + VRFY_DBINFO *vdp; + u_int32_t flags; +{ + VRFY_PAGEINFO *pip; + db_pgno_t i; + int ret, isbad; + + isbad = 0; + + if ((ret = __db_vrfy_getpageinfo(vdp, PGNO_BASE_MD, &pip)) != 0) + return (ret); + + if (pip->type != P_QAMMETA) { + EPRINT((dbp->env, + "Page %lu: queue database has no meta page", + (u_long)PGNO_BASE_MD)); + isbad = 1; + goto err; + } + + if ((ret = __db_vrfy_pgset_inc(vdp->pgset, vdp->thread_info, 0)) != 0) + goto err; + + for (i = 1; i <= vdp->last_pgno; i++) { + /* Send feedback to the application about our progress. */ + if (!LF_ISSET(DB_SALVAGE)) + __db_vrfy_struct_feedback(dbp, vdp); + + if ((ret = __db_vrfy_putpageinfo(dbp->env, vdp, pip)) != 0 || + (ret = __db_vrfy_getpageinfo(vdp, i, &pip)) != 0) + return (ret); + if (!F_ISSET(pip, VRFY_IS_ALLZEROES) && + pip->type != P_QAMDATA) { + EPRINT((dbp->env, + "Page %lu: queue database page of incorrect type %lu", + (u_long)i, (u_long)pip->type)); + isbad = 1; + goto err; + } else if ((ret = __db_vrfy_pgset_inc(vdp->pgset, + vdp->thread_info, i)) != 0) + goto err; + } + +err: if ((ret = __db_vrfy_putpageinfo(dbp->env, vdp, pip)) != 0) + return (ret); + return (isbad == 1 ? DB_VERIFY_BAD : 0); +} + +/* + * __qam_vrfy_walkqueue -- + * Do a "walkpages" per-page verification pass over the set of Queue + * extent pages. + * + * PUBLIC: int __qam_vrfy_walkqueue __P((DB *, VRFY_DBINFO *, void *, + * PUBLIC: int (*)(void *, const void *), u_int32_t)); + */ +int +__qam_vrfy_walkqueue(dbp, vdp, handle, callback, flags) + DB *dbp; + VRFY_DBINFO *vdp; + void *handle; + int (*callback) __P((void *, const void *)); + u_int32_t flags; +{ + DBC *dbc; + ENV *env; + PAGE *h; + QUEUE *qp; + VRFY_PAGEINFO *pip; + db_pgno_t first, i, last, pg_ext, stop; + int isbad, nextents, ret, t_ret; + + COMPQUIET(h, NULL); + + env = dbp->env; + qp = dbp->q_internal; + pip = NULL; + pg_ext = qp->page_ext; + isbad = ret = t_ret = 0; + h = NULL; + + /* If this database has no extents, we've seen all the pages already. */ + if (pg_ext == 0) + return (0); + + first = QAM_RECNO_PAGE(dbp, vdp->first_recno); + last = QAM_RECNO_PAGE(dbp, vdp->last_recno); + + i = first; + if (first > last) + stop = QAM_RECNO_PAGE(dbp, UINT32_MAX); + else + stop = last; + nextents = vdp->nextents; + + /* Verify/salvage each page. */ + if ((ret = __db_cursor(dbp, vdp->thread_info, NULL, &dbc, 0)) != 0) + return (ret); +begin: for (; i <= stop; i++) { + /* + * If DB_SALVAGE is set, we inspect our database of completed + * pages, and skip any we've already printed in the subdb pass. + */ + if (LF_ISSET(DB_SALVAGE) && (__db_salvage_isdone(vdp, i) != 0)) + continue; + if ((t_ret = __qam_fget(dbc, &i, 0, &h)) != 0) { + if (t_ret == ENOENT || t_ret == DB_PAGE_NOTFOUND) { + i += (pg_ext - ((i - 1) % pg_ext)) - 1; + continue; + } + + /* + * If an individual page get fails, keep going iff + * we're salvaging. + */ + if (LF_ISSET(DB_SALVAGE)) { + if (ret == 0) + ret = t_ret; + continue; + } + h = NULL; + ret = t_ret; + goto err; + } + + if (LF_ISSET(DB_SALVAGE)) { + /* + * We pretty much don't want to quit unless a + * bomb hits. May as well return that something + * was screwy, however. + */ + if ((t_ret = __db_salvage_pg(dbp, + vdp, i, h, handle, callback, flags)) != 0) { + if (ret == 0) + ret = t_ret; + isbad = 1; + } + } else { + /* + * If we are not salvaging, and we get any error + * other than DB_VERIFY_BAD, return immediately; + * it may not be safe to proceed. If we get + * DB_VERIFY_BAD, keep going; listing more errors + * may make it easier to diagnose problems and + * determine the magnitude of the corruption. + */ + if ((ret = __db_vrfy_common(dbp, + vdp, h, i, flags)) == DB_VERIFY_BAD) + isbad = 1; + else if (ret != 0) + goto err; + + __db_vrfy_struct_feedback(dbp, vdp); + + if ((ret = __db_vrfy_getpageinfo(vdp, i, &pip)) != 0) + goto err; + if (F_ISSET(pip, VRFY_IS_ALLZEROES)) + goto put; + if (pip->type != P_QAMDATA) { + EPRINT((env, + "Page %lu: queue database page of incorrect type %lu", + (u_long)i, (u_long)pip->type)); + isbad = 1; + goto err; + } + if ((ret = __db_vrfy_pgset_inc(vdp->pgset, + vdp->thread_info, i)) != 0) + goto err; + if ((ret = __qam_vrfy_data(dbp, vdp, + (QPAGE *)h, i, flags)) == DB_VERIFY_BAD) + isbad = 1; + else if (ret != 0) + goto err; + +put: if ((ret = __db_vrfy_putpageinfo(env, vdp, pip)) != 0) + goto err1; + pip = NULL; + } + + /* Again, keep going iff we're salvaging. */ + if ((t_ret = __qam_fput(dbc, i, h, dbp->priority)) != 0) { + if (LF_ISSET(DB_SALVAGE)) { + if (ret == 0) + ret = t_ret; + continue; + } + ret = t_ret; + goto err1; + } + } + + if (first > last) { + i = 1; + stop = last; + first = last; + goto begin; + } + + /* + * Now check to see if there were any lingering + * extents and dump their data. + */ + if (LF_ISSET(DB_SALVAGE) && nextents != 0) { + nextents--; + i = 1 + + vdp->extents[nextents] * vdp->page_ext; + stop = i + vdp->page_ext; + goto begin; + } + + if (0) { +err: if (h != NULL && (t_ret = + __qam_fput(dbc, i, h, dbp->priority)) != 0 && ret == 0) + ret = t_ret; + if (pip != NULL && (t_ret = + __db_vrfy_putpageinfo(env, vdp, pip)) != 0 && ret == 0) + ret = t_ret; + } +err1: if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) + ret = t_ret; + return ((isbad == 1 && ret == 0) ? DB_VERIFY_BAD : ret); +} + +/* + * __qam_salvage -- + * Safely dump out all recnos and data on a queue page. + * + * PUBLIC: int __qam_salvage __P((DB *, VRFY_DBINFO *, db_pgno_t, PAGE *, + * PUBLIC: void *, int (*)(void *, const void *), u_int32_t)); + */ +int +__qam_salvage(dbp, vdp, pgno, h, handle, callback, flags) + DB *dbp; + VRFY_DBINFO *vdp; + db_pgno_t pgno; + PAGE *h; + void *handle; + int (*callback) __P((void *, const void *)); + u_int32_t flags; +{ + DBT dbt, key; + QAMDATA *qp, *qep; + db_recno_t recno; + int ret, err_ret, t_ret; + u_int32_t pagesize, qlen; + u_int32_t i; + + memset(&dbt, 0, sizeof(DBT)); + memset(&key, 0, sizeof(DBT)); + + err_ret = ret = 0; + + pagesize = (u_int32_t)dbp->mpf->mfp->stat.st_pagesize; + qlen = ((QUEUE *)dbp->q_internal)->re_len; + dbt.size = qlen; + key.data = &recno; + key.size = sizeof(recno); + recno = (pgno - 1) * QAM_RECNO_PER_PAGE(dbp) + 1; + i = 0; + qep = (QAMDATA *)((u_int8_t *)h + pagesize - qlen); + for (qp = QAM_GET_RECORD(dbp, h, i); qp < qep; + recno++, i++, qp = QAM_GET_RECORD(dbp, h, i)) { + if (F_ISSET(qp, ~(QAM_VALID|QAM_SET))) + continue; + if (!F_ISSET(qp, QAM_SET)) + continue; + + if (!LF_ISSET(DB_AGGRESSIVE) && !F_ISSET(qp, QAM_VALID)) + continue; + + dbt.data = qp->data; + if ((ret = __db_vrfy_prdbt(&key, + 0, " ", handle, callback, 1, vdp)) != 0) + err_ret = ret; + + if ((ret = __db_vrfy_prdbt(&dbt, + 0, " ", handle, callback, 0, vdp)) != 0) + err_ret = ret; + } + + if ((t_ret = __db_salvage_markdone(vdp, pgno)) != 0) + return (t_ret); + return ((ret == 0 && err_ret != 0) ? err_ret : ret); +} |