diff options
Diffstat (limited to 'db/btree/bt_cursor.c')
-rw-r--r-- | db/btree/bt_cursor.c | 292 |
1 files changed, 154 insertions, 138 deletions
diff --git a/db/btree/bt_cursor.c b/db/btree/bt_cursor.c index 067da53be..82d6cc435 100644 --- a/db/btree/bt_cursor.c +++ b/db/btree/bt_cursor.c @@ -1,16 +1,14 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 1996-2003 + * Copyright (c) 1996-2004 * Sleepycat Software. All rights reserved. + * + * $Id: bt_cursor.c,v 11.190 2004/09/22 21:46:32 ubell Exp $ */ #include "db_config.h" -#ifndef lint -static const char revid[] = "$Id: bt_cursor.c,v 11.169 2003/11/19 18:41:06 bostic Exp $"; -#endif /* not lint */ - #ifndef NO_SYSTEM_INCLUDES #include <sys/types.h> @@ -53,11 +51,11 @@ static int __bam_isopd __P((DBC *, db_pgno_t *)); * don't -- we don't duplicate locks when we duplicate cursors if we are * running in a transaction environment as there's no point if locks are * never discarded. This means that the cursor may or may not hold a lock. - * In the case where we are decending the tree we always want to - * unlock the held interior page so we use ACQUIRE_COUPLE. + * In the case where we are descending the tree we always want to unlock + * the held interior page so we use ACQUIRE_COUPLE. */ #undef ACQUIRE -#define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, ret) { \ +#define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, ret) do { \ DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \ if ((pagep) != NULL) { \ ret = __memp_fput(__mpf, pagep, 0); \ @@ -68,10 +66,10 @@ static int __bam_isopd __P((DBC *, db_pgno_t *)); ret = __db_lget(dbc, LCK_COUPLE, lpgno, mode, 0, &(lock));\ if ((ret) == 0) \ ret = __memp_fget(__mpf, &(fpgno), 0, &(pagep)); \ -} +} while (0) #undef ACQUIRE_COUPLE -#define ACQUIRE_COUPLE(dbc, mode, lpgno, lock, fpgno, pagep, ret) { \ +#define ACQUIRE_COUPLE(dbc, mode, lpgno, lock, fpgno, pagep, ret) do { \ DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \ if ((pagep) != NULL) { \ ret = __memp_fput(__mpf, pagep, 0); \ @@ -83,37 +81,37 @@ static int __bam_isopd __P((DBC *, db_pgno_t *)); LCK_COUPLE_ALWAYS, lpgno, mode, 0, &(lock)); \ if ((ret) == 0) \ ret = __memp_fget(__mpf, &(fpgno), 0, &(pagep)); \ -} +} while (0) /* Acquire a new page/lock for a cursor. */ #undef ACQUIRE_CUR -#define ACQUIRE_CUR(dbc, mode, p, ret) { \ +#define ACQUIRE_CUR(dbc, mode, p, ret) do { \ BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \ - if (p != __cp->pgno) \ - __cp->pgno = PGNO_INVALID; \ + if (p != __cp->pgno) \ + __cp->pgno = PGNO_INVALID; \ ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, ret); \ if ((ret) == 0) { \ __cp->pgno = p; \ __cp->lock_mode = (mode); \ } \ -} +} while (0) /* * Acquire a new page/lock for a cursor and release the previous. - * This is typically used when decending a tree and we do not + * This is typically used when descending a tree and we do not * want to hold the interior nodes locked. */ #undef ACQUIRE_CUR_COUPLE -#define ACQUIRE_CUR_COUPLE(dbc, mode, p, ret) { \ +#define ACQUIRE_CUR_COUPLE(dbc, mode, p, ret) do { \ BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \ - if (p != __cp->pgno) \ - __cp->pgno = PGNO_INVALID; \ + if (p != __cp->pgno) \ + __cp->pgno = PGNO_INVALID; \ ACQUIRE_COUPLE(dbc, mode, p, __cp->lock, p, __cp->page, ret); \ if ((ret) == 0) { \ __cp->pgno = p; \ __cp->lock_mode = (mode); \ } \ -} +} while (0) /* * Acquire a write lock if we don't already have one. @@ -122,7 +120,7 @@ static int __bam_isopd __P((DBC *, db_pgno_t *)); * See ACQUIRE macro on why we handle cursors that don't have locks. */ #undef ACQUIRE_WRITE_LOCK -#define ACQUIRE_WRITE_LOCK(dbc, ret) { \ +#define ACQUIRE_WRITE_LOCK(dbc, ret) do { \ BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \ ret = 0; \ if (STD_LOCKING(dbc) && \ @@ -131,25 +129,27 @@ static int __bam_isopd __P((DBC *, db_pgno_t *)); LOCK_ISSET(__cp->lock) ? LCK_COUPLE : 0, \ __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) == 0) \ __cp->lock_mode = DB_LOCK_WRITE; \ -} +} while (0) /* Discard the current page/lock for a cursor. */ #undef DISCARD_CUR -#define DISCARD_CUR(dbc, ret) { \ +#define DISCARD_CUR(dbc, ret) do { \ BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \ DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \ int __t_ret; \ if ((__cp->page) != NULL) { \ - ret = __memp_fput(__mpf, __cp->page, 0); \ + __t_ret = __memp_fput(__mpf, __cp->page, 0); \ __cp->page = NULL; \ } else \ - ret = 0; \ + __t_ret = 0; \ + if (__t_ret != 0 && (ret) == 0) \ + ret = __t_ret; \ __t_ret = __TLPUT((dbc), __cp->lock); \ if (__t_ret != 0 && (ret) == 0) \ ret = __t_ret; \ if ((ret) == 0 && !LOCK_ISSET(__cp->lock)) \ __cp->lock_mode = DB_LOCK_NG; \ -} +} while (0) /* If on-page item is a deleted record. */ #undef IS_DELETED @@ -308,7 +308,7 @@ __bam_c_close(dbc, root_pgno, rmroot) DBC *dbc_opd, *dbc_c; DB_MPOOLFILE *mpf; PAGE *h; - int cdb_lock, ret, t_ret; + int cdb_lock, ret; dbp = dbc->dbp; mpf = dbp->mpf; @@ -426,8 +426,8 @@ __bam_c_close(dbc, root_pgno, rmroot) case DB_QUEUE: case DB_UNKNOWN: default: - return (__db_unknown_type(dbp->dbenv, - "__bam_c_close", dbc->dbtype)); + return (__db_unknown_type( + dbp->dbenv, "__bam_c_close", dbc->dbtype)); } } goto done; @@ -448,32 +448,31 @@ lock: cp_c = (BTREE_CURSOR *)dbc_c->internal; goto err; cdb_lock = 1; } - if ((ret = __memp_fget(mpf, &cp_c->pgno, 0, &cp_c->page)) != 0) - goto err; goto delete; } /* * The variable dbc_c has been initialized to reference the cursor in - * which we're going to do the delete. Initialize the cursor's page - * and lock structures as necessary. + * which we're going to do the delete. Initialize the cursor's lock + * structures as necessary. * * First, we may not need to acquire any locks. If we're in case #3, * that is, the primary database isn't a btree database, our caller * is responsible for acquiring any necessary locks before calling us. */ - if (F_ISSET(dbc, DBC_OPD)) { - if ((ret = __memp_fget(mpf, &cp_c->pgno, 0, &cp_c->page)) != 0) - goto err; + if (F_ISSET(dbc, DBC_OPD)) goto delete; - } /* - * Otherwise, acquire a write lock. If the cursor that did the initial - * logical deletion (and which had a write lock) is not the same as the - * cursor doing the physical deletion (which may have only ever had a - * read lock on the item), we need to upgrade. The confusion comes as - * follows: + * Otherwise, acquire a write lock on the primary database's page. + * + * Lock the primary database page, regardless of whether we're deleting + * an item on a primary database page or an off-page duplicates page. + * + * If the cursor that did the initial logical deletion (and had a write + * lock) is not the same cursor doing the physical deletion (which may + * have only ever had a read lock on the item), we need to upgrade to a + * write lock. The confusion comes as follows: * * C1 created, acquires item read lock * C2 dup C1, create C2, also has item read lock. @@ -483,29 +482,37 @@ lock: cp_c = (BTREE_CURSOR *)dbc_c->internal; * * If we're in a TXN, we know that C2 will be able to acquire the write * lock, because no locker other than the one shared by C1 and C2 can - * acquire a write lock -- the original write lock C1 acquire was never + * acquire a write lock -- the original write lock C1 acquired was never * discarded. * * If we're not in a TXN, it's nastier. Other cursors might acquire * read locks on the item after C1 closed, discarding its write lock, * and such locks would prevent C2 from acquiring a read lock. That's - * OK, though, we'll simply wait until we can acquire a read lock, or + * OK, though, we'll simply wait until we can acquire a write lock, or * we'll deadlock. (Which better not happen, since we're not in a TXN.) * - * Lock the primary database page, regardless of whether we're deleting - * an item on a primary database page or an off-page duplicates page. + * There are similar scenarios with dirty reads, where the cursor may + * have downgraded its write lock to a was-write lock. */ - ACQUIRE(dbc, DB_LOCK_WRITE, - cp->pgno, cp_c->lock, cp_c->pgno, cp_c->page, ret); - if (ret != 0) - goto err; + if (STD_LOCKING(dbc)) + if ((ret = __db_lget(dbc, + LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0) + goto err; delete: /* - * If the delete occurred in a btree, delete the on-page physical item - * referenced by the cursor. + * If the delete occurred in a Btree, we're going to look at the page + * to see if the item has to be physically deleted. Otherwise, we do + * not need the actual page (and it may not even exist, it might have + * been truncated from the file after an allocation aborted). + * + * Delete the on-page physical item referenced by the cursor. */ - if (dbc_c->dbtype == DB_BTREE && (ret = __bam_c_physdel(dbc_c)) != 0) - goto err; + if (dbc_c->dbtype == DB_BTREE) { + if ((ret = __memp_fget(mpf, &cp_c->pgno, 0, &cp_c->page)) != 0) + goto err; + if ((ret = __bam_c_physdel(dbc_c)) != 0) + goto err; + } /* * If we're not working in an off-page duplicate tree, then we're @@ -526,6 +533,9 @@ delete: /* if ((ret = __memp_fget(mpf, &root_pgno, 0, &h)) != 0) goto err; if (NUM_ENT(h) == 0) { + DISCARD_CUR(dbc_c, ret); + if (ret != 0) + goto err; if ((ret = __db_free(dbc, h)) != 0) goto err; } else { @@ -558,14 +568,9 @@ done: /* * Discard the page references and locks, and confirm that the stack * has been emptied. */ - if (dbc_opd != NULL) { - DISCARD_CUR(dbc_opd, t_ret); - if (t_ret != 0 && ret == 0) - ret = t_ret; - } - DISCARD_CUR(dbc, t_ret); - if (t_ret != 0 && ret == 0) - ret = t_ret; + if (dbc_opd != NULL) + DISCARD_CUR(dbc_opd, ret); + DISCARD_CUR(dbc, ret); /* Downgrade any CDB lock we acquired. */ if (cdb_lock) @@ -785,11 +790,11 @@ __bam_c_dup(orig_dbc, new_dbc) * holding inside a transaction because all the locks are retained * until the transaction commits or aborts. */ - if (LOCK_ISSET(orig->lock) && orig_dbc->txn == NULL) { + if (orig_dbc->txn == NULL && LOCK_ISSET(orig->lock)) if ((ret = __db_lget(new_dbc, 0, new->pgno, new->lock_mode, 0, &new->lock)) != 0) return (ret); - } + new->ovflsize = orig->ovflsize; new->recno = orig->recno; new->flags = orig->flags; @@ -1064,9 +1069,9 @@ __bam_bulk(dbc, data, flags) cp = (BTREE_CURSOR *)dbc->internal; /* - * dp tracks the beginging of the page in the buffer. + * dp tracks the beginning of the page in the buffer. * np is the next place to copy things into the buffer. - * dbuf always stays at the beging of the buffer. + * dbuf always stays at the beginning of the buffer. */ dbuf = data->data; np = dp = dbuf; @@ -1172,10 +1177,11 @@ next_pg: get_key_space: /* Nothing added, then error. */ if (offp == endp) { - data->size = - ALIGN(size + + data->size = (u_int32_t) + DB_ALIGN(size + pagesize, 1024); - return (ENOMEM); + return + (DB_BUFFER_SMALL); } /* * We need to back up to the @@ -1246,7 +1252,7 @@ get_key_space: if ((ret = __bam_bulk_duplicates(dbc, bo->pgno, dbuf, is_key ? offp + P_INDX : NULL, &offp, &np, &space, no_dup)) != 0) { - if (ret == ENOMEM) { + if (ret == DB_BUFFER_SMALL) { size = space; space = 0; /* If nothing was added, then error. */ @@ -1307,9 +1313,10 @@ get_space: if (offp >= (is_key ? &endp[-1] : endp) || F_ISSET(dbc, DBC_TRANSIENT)) { - data->size = ALIGN(size + + data->size = (u_int32_t) + DB_ALIGN(size + data->ulen - space, 1024); - return (ENOMEM); + return (DB_BUFFER_SMALL); } break; } @@ -1339,8 +1346,8 @@ get_space: indx += adj; } /* - * Stop when we either run off the page or we - * move to the next key and we are not returning mulitple keys. + * Stop when we either run off the page or we move to the next key and + * we are not returning multiple keys. */ } while ((indx += adj) < NUM_ENT(pg) && (next_key || pg_keyoff == inp[indx])); @@ -1365,14 +1372,14 @@ get_space: if (ret == 0 && indx < pg->entries && F_ISSET(dbc, DBC_TRANSIENT) && pg_keyoff == inp[indx]) { data->size = (data->ulen - space) + size; - return (ENOMEM); + return (DB_BUFFER_SMALL); } /* * Must leave the index pointing at the last record fetched. * If we are not fetching keys, we may have stepped to the * next key. */ - if (ret == ENOMEM || next_key || pg_keyoff == inp[indx]) + if (ret == DB_BUFFER_SMALL || next_key || pg_keyoff == inp[indx]) cp->indx = indx; else cp->indx = indx - P_INDX; @@ -1462,7 +1469,7 @@ __bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup) /* * np is the next place to put data. - * dp is the begining of the current page in the buffer. + * dp is the beginning of the current page in the buffer. */ np = dp = *dpp; first = 1; @@ -1489,7 +1496,7 @@ __bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup) /* Did space underflow? */ if (space > *spacep) { - ret = ENOMEM; + ret = DB_BUFFER_SMALL; if (first == 1) { /* Get the absolute value. */ space = -(int32_t)space; @@ -1503,7 +1510,7 @@ __bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup) bo = (BOVERFLOW *)bk; size = bo->tlen; if (size > space) { - ret = ENOMEM; + ret = DB_BUFFER_SMALL; space = *spacep + size; break; } @@ -1522,7 +1529,7 @@ __bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup) dp = np; size = pagesize - HOFFSET(pg); if (space < size) { - ret = ENOMEM; + ret = DB_BUFFER_SMALL; /* Return space required. */ space = *spacep + size; break; @@ -1565,7 +1572,7 @@ contin: * If we ran out of space back up the pointer. * If we did not return any dups or reached the end, close the opd. */ - if (ret == ENOMEM) { + if (ret == DB_BUFFER_SMALL) { if (opd->dbtype == DB_RECNO) { if (--cp->recno == 0) goto close_opd; @@ -1780,6 +1787,7 @@ __bam_c_put(dbc, key, data, flags, pgnop) u_int32_t flags; db_pgno_t *pgnop; { + BTREE *t; BTREE_CURSOR *cp; DB *dbp; DBT dbt; @@ -1796,39 +1804,16 @@ __bam_c_put(dbc, key, data, flags, pgnop) split: ret = stack = 0; switch (flags) { + case DB_CURRENT: + if (F_ISSET(cp, C_DELETED)) + return (DB_NOTFOUND); + /* FALLTHROUGH */ + case DB_AFTER: case DB_BEFORE: - case DB_CURRENT: iiop = flags; own = 1; - /* - * If the Btree has record numbers (and we're not replacing an - * existing record), we need a complete stack so that we can - * adjust the record counts. The check for flags == DB_CURRENT - * is superfluous but left in for clarity. (If C_RECNUM is set - * we know that flags must be DB_CURRENT, as DB_AFTER/DB_BEFORE - * are illegal in a Btree unless it's configured for duplicates - * and you cannot configure a Btree for both record renumbering - * and duplicates.) - */ - if (flags == DB_CURRENT && - F_ISSET(cp, C_RECNUM) && F_ISSET(cp, C_DELETED)) { - if ((ret = __bam_c_getstack(dbc)) != 0) - goto err; - /* - * Initialize the cursor from the stack. Don't take - * the page number or page index, they should already - * be set. - */ - cp->page = cp->csp->page; - cp->lock = cp->csp->lock; - cp->lock_mode = cp->csp->lock_mode; - - stack = 1; - break; - } - /* Acquire the current page with a write lock. */ ACQUIRE_WRITE_LOCK(dbc, ret); if (ret != 0) @@ -1996,12 +1981,11 @@ split: ret = stack = 0; /* * SR [#6059] - * If we do not own a lock on the page anymore then - * clear the cursor so we don't point at it. - * Even if we call __bam_stkrel above we still - * may have entered the routine with the cursor - * posistioned to a particular record. This - * is in the case where C_RECNUM is set. + * If we do not own a lock on the page any more, then clear the + * cursor so we don't point at it. Even if we call __bam_stkrel + * above we still may have entered the routine with the cursor + * positioned to a particular record. This is in the case + * where C_RECNUM is set. */ if (own == 0) { cp->pgno = PGNO_INVALID; @@ -2019,6 +2003,33 @@ split: ret = stack = 0; err: done: /* + * If we inserted a key into the first or last slot of the tree, + * remember where it was so we can do it more quickly next time. + * If the tree has record numbers, we need a complete stack so + * that we can adjust the record counts, so skipping the tree search + * isn't possible. For subdatabases we need to be careful that the + * page does not move from one db to another, so we track its LSN. + * + * If there are duplicates and we are inserting into the last slot, + * the cursor will point _to_ the last item, not after it, which + * is why we subtract P_INDX below. + */ + + t = dbp->bt_internal; + if (ret == 0 && TYPE(cp->page) == P_LBTREE && + (flags == DB_KEYFIRST || flags == DB_KEYLAST) && + !F_ISSET(cp, C_RECNUM) && + (!F_ISSET(dbp, DB_AM_SUBDB) || + (LOGGING_ON(dbp->dbenv) && !F_ISSET(dbp, DB_AM_NOT_DURABLE))) && + ((NEXT_PGNO(cp->page) == PGNO_INVALID && + cp->indx >= NUM_ENT(cp->page) - P_INDX) || + (PREV_PGNO(cp->page) == PGNO_INVALID && cp->indx == 0))) { + t->bt_lpgno = cp->pgno; + if (F_ISSET(dbp, DB_AM_SUBDB)) + t->bt_llsn = LSN(cp->page); + } else + t->bt_lpgno = PGNO_INVALID; + /* * Discard any pages pinned in the tree and their locks, except for * the leaf page. Note, the leaf page participated in any stack we * acquired, and so we have to adjust the stack as necessary. If @@ -2371,7 +2382,7 @@ __bam_c_search(dbc, root_pgno, key, flags, exactp) db_pgno_t bt_lpgno; db_recno_t recno; u_int32_t sflags; - int cmp, ret; + int cmp, ret, t_ret; dbp = dbc->dbp; cp = (BTREE_CURSOR *)dbc->internal; @@ -2415,12 +2426,8 @@ fast_search: /* * If the application has a history of inserting into the first * or last pages of the database, we check those pages first to * avoid doing a full search. - * - * If the tree has record numbers, we need a complete stack so - * that we can adjust the record counts, so fast_search isn't - * possible. */ - if (F_ISSET(cp, C_RECNUM)) + if (F_ISSET(dbc, DBC_OPD)) goto search; /* @@ -2440,14 +2447,24 @@ fast_search: /* if (bt_lpgno == PGNO_INVALID) goto search; - /* Lock and retrieve the page on which we last inserted. */ + /* + * Lock and retrieve the page on which we last inserted. + * + * The page may not exist: if a transaction created the page + * and then aborted, the page might have been truncated from + * the end of the file. + */ h = NULL; ACQUIRE_CUR(dbc, DB_LOCK_WRITE, bt_lpgno, ret); - if (ret != 0) + if (ret != 0) { + if (ret == DB_PAGE_NOTFOUND) + ret = 0; goto fast_miss; + } h = cp->page; inp = P_INP(dbp, h); + /* * It's okay if the page type isn't right or it's empty, it * just means that the world changed. @@ -2455,6 +2472,10 @@ fast_search: /* if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0) goto fast_miss; + /* Verify that this page cannot have moved to another db. */ + if (F_ISSET(dbp, DB_AM_SUBDB) && + log_compare(&t->bt_llsn, &LSN(h)) != 0) + goto fast_miss; /* * What we do here is test to see if we're at the beginning or * end of the tree and if the new item sorts before/after the @@ -2537,10 +2558,13 @@ fast_hit: /* Set the exact match flag, we may have found a duplicate. */ fast_miss: /* * This was not the right page, so we do not need to retain * the lock even in the presence of transactions. + * + * This is also an error path, so ret may have been set. */ DISCARD_CUR(dbc, ret); cp->pgno = PGNO_INVALID; - (void)__LPUT(dbc, cp->lock); + if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) + ret = t_ret; if (ret != 0) return (ret); @@ -2559,20 +2583,6 @@ search: if ((ret = __bam_search(dbc, root_pgno, cp->lock = cp->csp->lock; cp->lock_mode = cp->csp->lock_mode; - /* - * If we inserted a key into the first or last slot of the tree, - * remember where it was so we can do it more quickly next time. - * If there are duplicates and we are inserting into the last slot, - * the cursor will point _to_ the last item, not after it, which - * is why we subtract P_INDX below. - */ - if (TYPE(cp->page) == P_LBTREE && - (flags == DB_KEYFIRST || flags == DB_KEYLAST)) - t->bt_lpgno = - (NEXT_PGNO(cp->page) == PGNO_INVALID && - cp->indx >= NUM_ENT(cp->page) - P_INDX) || - (PREV_PGNO(cp->page) == PGNO_INVALID && - cp->indx == 0) ? cp->pgno : PGNO_INVALID; return (0); } @@ -2661,6 +2671,10 @@ __bam_c_physdel(dbc) } if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0) return (ret); + + /* Clear the deleted flag, the item is gone. */ + F_CLR(cp, C_DELETED); + if (!empty_page) if ((ret = __bam_ca_di(dbc, PGNO(cp->page), cp->indx, -1)) != 0) return (ret); @@ -2760,6 +2774,8 @@ __bam_c_physdel(dbc) * stack and page locks without further damage. */ if (ret == 0) + DISCARD_CUR(dbc, ret); + if (ret == 0) ret = __bam_dpages(dbc, cp->sp); else (void)__bam_stkrel(dbc, 0); |