Imported Upstream version 5.3.21
[platform/upstream/libdb.git] / src / btree / bt_cursor.c
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996, 2012 Oracle and/or its affiliates.  All rights reserved.
5  *
6  * $Id$
7  */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/lock.h"
15 #include "dbinc/mp.h"
16
17 static int  __bam_bulk __P((DBC *, DBT *, u_int32_t));
18 static int  __bamc_close __P((DBC *, db_pgno_t, int *));
19 static int  __bamc_del __P((DBC *, u_int32_t));
20 static int  __bamc_destroy __P((DBC *));
21 static int  __bamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
22 static int  __bamc_getstack __P((DBC *));
23 static int  __bamc_next __P((DBC *, int, int));
24 static int  __bamc_physdel __P((DBC *));
25 static int  __bamc_prev __P((DBC *));
26 static int  __bamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
27 static int  __bamc_search __P((DBC *,
28                 db_pgno_t, const DBT *, u_int32_t, int *));
29 static int  __bamc_writelock __P((DBC *));
30 static int  __bam_getboth_finddatum __P((DBC *, DBT *, u_int32_t));
31 static int  __bam_getbothc __P((DBC *, DBT *));
32 static int  __bam_get_prev __P((DBC *));
33 static int  __bam_isopd __P((DBC *, db_pgno_t *));
34 #ifdef HAVE_COMPRESSION
35 static int  __bam_getlte __P((DBC *, DBT *, DBT *));
36 #endif
37
38 /*
39  * Acquire a new page/lock.  If we hold a page/lock, discard the page, and
40  * lock-couple the lock.
41  *
42  * !!!
43  * We have to handle both where we have a lock to lock-couple and where we
44  * don't -- we don't duplicate locks when we duplicate cursors if we are
45  * running in a transaction environment as there's no point if locks are
46  * never discarded.  This means that the cursor may or may not hold a lock.
47  * In the case where we are descending the tree we always want to unlock
48  * the held interior page so we use ACQUIRE_COUPLE.
49  */
50 #undef  ACQUIRE
51 #define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, flags, ret) do {  \
52         DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
53         if ((pagep) != NULL) {                                          \
54                 ret = __memp_fput(__mpf,                                \
55                      (dbc)->thread_info, pagep, dbc->priority);         \
56                 pagep = NULL;                                           \
57         } else                                                          \
58                 ret = 0;                                                \
59         if ((ret) == 0 && STD_LOCKING(dbc))                             \
60                 ret = __db_lget(                                        \
61                     dbc, LCK_COUPLE, lpgno, mode, flags, &(lock));      \
62         if ((ret) == 0)                                                 \
63                 ret = __memp_fget(__mpf, &(fpgno),                      \
64                     (dbc)->thread_info, (dbc)->txn, 0, &(pagep));       \
65 } while (0)
66
67 /* Acquire a new page/lock for a cursor. */
68 #undef  ACQUIRE_CUR
69 #define ACQUIRE_CUR(dbc, mode, p, flags, ret) do {                      \
70         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
71         if (p != __cp->pgno)                                            \
72                 __cp->pgno = PGNO_INVALID;                              \
73         ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, flags, ret);   \
74         if ((ret) == 0) {                                               \
75                 __cp->pgno = p;                                         \
76                 __cp->lock_mode = (mode);                               \
77         }                                                               \
78 } while (0)
79
80 /*
81  * Acquire a write lock if we don't already have one.
82  *
83  * !!!
84  * See ACQUIRE macro on why we handle cursors that don't have locks.
85  */
86 #undef  ACQUIRE_WRITE_LOCK
87 #define ACQUIRE_WRITE_LOCK(dbc, ret) do {                               \
88         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
89         DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
90         int __get_page = 0;                                             \
91         ret = 0;                                                        \
92         if (STD_LOCKING(dbc) && __cp->lock_mode != DB_LOCK_WRITE) {     \
93                 if (__cp->page != NULL) {                               \
94                         (ret) = __memp_fput(__mpf, (dbc)->thread_info,  \
95                             __cp->page, (dbc)->priority);               \
96                         __cp->page = NULL;                              \
97                         __get_page = 1;                                 \
98                         if ((ret) !=0)                                  \
99                                 break;                                  \
100                 }                                                       \
101                 if (((ret) = __db_lget((dbc),                           \
102                     LOCK_ISSET(__cp->lock) ? LCK_COUPLE : 0,            \
103                     __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) != 0)   \
104                         break;                                          \
105                 __cp->lock_mode = DB_LOCK_WRITE;                        \
106                 if (__get_page == 0)                                    \
107                         break;                                          \
108                 (ret) = __memp_fget(__mpf, &__cp->pgno,         \
109                     (dbc)->thread_info,                                 \
110                     (dbc)->txn, DB_MPOOL_DIRTY, &__cp->page);           \
111         }                                                               \
112 } while (0)
113
114 /* Discard the current page/lock for a cursor. */
115 #undef  DISCARD_CUR
116 #define DISCARD_CUR(dbc, ret) do {                                      \
117         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
118         DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
119         int __t_ret;                                                    \
120         if ((__cp->page) != NULL) {                                     \
121                 __t_ret = __memp_fput(__mpf,                            \
122                      (dbc)->thread_info, __cp->page, dbc->priority);\
123                 __cp->page = NULL;                                      \
124         } else                                                          \
125                 __t_ret = 0;                                            \
126         if (__t_ret != 0 && (ret) == 0)                                 \
127                 ret = __t_ret;                                          \
128         __t_ret = __TLPUT((dbc), __cp->lock);                           \
129         if (__t_ret != 0 && (ret) == 0)                                 \
130                 ret = __t_ret;                                          \
131         if ((ret) == 0 && !LOCK_ISSET(__cp->lock))                      \
132                 __cp->lock_mode = DB_LOCK_NG;                           \
133         __cp->stream_start_pgno = PGNO_INVALID;                         \
134 } while (0)
135
136 /* If on-page item is a deleted record. */
137 #undef  IS_DELETED
138 #define IS_DELETED(dbp, page, indx)                                     \
139         B_DISSET(GET_BKEYDATA(dbp, page,                                \
140             (indx) + (TYPE(page) == P_LBTREE ? O_INDX : 0))->type)
141 #undef  IS_CUR_DELETED
142 #define IS_CUR_DELETED(dbc)                                             \
143         IS_DELETED((dbc)->dbp, (dbc)->internal->page, (dbc)->internal->indx)
144
145 /*
146  * Test to see if two cursors could point to duplicates of the same key.
147  * In the case of off-page duplicates they are they same, as the cursors
148  * will be in the same off-page duplicate tree.  In the case of on-page
149  * duplicates, the key index offsets must be the same.  For the last test,
150  * as the original cursor may not have a valid page pointer, we use the
151  * current cursor's.
152  */
153 #undef  IS_DUPLICATE
154 #define IS_DUPLICATE(dbc, i1, i2)                                       \
155             (P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i1] ==   \
156              P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i2])
157 #undef  IS_CUR_DUPLICATE
158 #define IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)                     \
159         (F_ISSET(dbc, DBC_OPD) ||                                       \
160             (orig_pgno == (dbc)->internal->pgno &&                      \
161             IS_DUPLICATE(dbc, (dbc)->internal->indx, orig_indx)))
162
163 /*
164  * __bamc_init --
165  *      Initialize the access private portion of a cursor
166  *
167  * PUBLIC: int __bamc_init __P((DBC *, DBTYPE));
168  */
169 int
170 __bamc_init(dbc, dbtype)
171         DBC *dbc;
172         DBTYPE dbtype;
173 {
174         ENV *env;
175         int ret;
176 #ifdef HAVE_COMPRESSION
177         BTREE_CURSOR *cp;
178 #endif
179
180         env = dbc->env;
181
182         /* Allocate/initialize the internal structure. */
183         if (dbc->internal == NULL) {
184                 if ((ret = __os_calloc(
185                     env, 1, sizeof(BTREE_CURSOR), &dbc->internal)) != 0)
186                         return (ret);
187
188 #ifdef HAVE_COMPRESSION
189                 cp = (BTREE_CURSOR*)dbc->internal;
190                 cp->compressed.flags = DB_DBT_USERMEM;
191                 cp->key1.flags = DB_DBT_USERMEM;
192                 cp->key2.flags = DB_DBT_USERMEM;
193                 cp->data1.flags = DB_DBT_USERMEM;
194                 cp->data2.flags = DB_DBT_USERMEM;
195                 cp->del_key.flags = DB_DBT_USERMEM;
196                 cp->del_data.flags = DB_DBT_USERMEM;
197 #endif
198         }
199
200         /* Initialize methods. */
201         dbc->close = dbc->c_close = __dbc_close_pp;
202         dbc->cmp = __dbc_cmp_pp;
203         dbc->count = dbc->c_count = __dbc_count_pp;
204         dbc->del = dbc->c_del = __dbc_del_pp;
205         dbc->dup = dbc->c_dup = __dbc_dup_pp;
206         dbc->get = dbc->c_get = __dbc_get_pp;
207         dbc->pget = dbc->c_pget = __dbc_pget_pp;
208         dbc->put = dbc->c_put = __dbc_put_pp;
209         if (dbtype == DB_BTREE) {
210                 dbc->am_bulk = __bam_bulk;
211                 dbc->am_close = __bamc_close;
212                 dbc->am_del = __bamc_del;
213                 dbc->am_destroy = __bamc_destroy;
214                 dbc->am_get = __bamc_get;
215                 dbc->am_put = __bamc_put;
216                 dbc->am_writelock = __bamc_writelock;
217         } else {
218                 dbc->am_bulk = __bam_bulk;
219                 dbc->am_close = __bamc_close;
220                 dbc->am_del = __ramc_del;
221                 dbc->am_destroy = __bamc_destroy;
222                 dbc->am_get = __ramc_get;
223                 dbc->am_put = __ramc_put;
224                 dbc->am_writelock = __bamc_writelock;
225         }
226
227         return (0);
228 }
229
230 /*
231  * __bamc_refresh
232  *      Set things up properly for cursor re-use.
233  *
234  * PUBLIC: int __bamc_refresh __P((DBC *));
235  */
236 int
237 __bamc_refresh(dbc)
238         DBC *dbc;
239 {
240         BTREE *t;
241         BTREE_CURSOR *cp;
242         DB *dbp;
243
244         dbp = dbc->dbp;
245         t = dbp->bt_internal;
246         cp = (BTREE_CURSOR *)dbc->internal;
247
248         /*
249          * If our caller set the root page number, it's because the root was
250          * known.  This is always the case for off page dup cursors.  Else,
251          * pull it out of our internal information, unless this is a subdb.
252          */
253         if (cp->root == PGNO_INVALID && t->bt_meta == PGNO_BASE_MD)
254                 cp->root = t->bt_root;
255
256         LOCK_INIT(cp->lock);
257         cp->lock_mode = DB_LOCK_NG;
258
259         if (cp->sp == NULL) {
260                 cp->sp = cp->stack;
261                 cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]);
262         }
263         BT_STK_CLR(cp);
264
265 #ifdef HAVE_COMPRESSION
266         /* Initialize compression */
267         cp->prevKey = 0;
268         cp->prevData = 0;
269         cp->currentKey = 0;
270         cp->currentData = 0;
271         cp->compcursor = 0;
272         cp->compend = 0;
273         cp->prevcursor = 0;
274         cp->prev2cursor = 0;
275 #endif
276
277         /*
278          * The btree leaf page data structures require that two key/data pairs
279          * (or four items) fit on a page, but other than that there's no fixed
280          * requirement.  The btree off-page duplicates only require two items,
281          * to be exact, but requiring four for them as well seems reasonable.
282          *
283          * Recno uses the btree bt_ovflsize value -- it's close enough.
284          */
285         cp->ovflsize = B_MINKEY_TO_OVFLSIZE(
286             dbp,  F_ISSET(dbc, DBC_OPD) ? 2 : t->bt_minkey, dbp->pgsize);
287
288         cp->recno = RECNO_OOB;
289         cp->order = INVALID_ORDER;
290         cp->flags = 0;
291
292         /* Initialize for record numbers. */
293         if (F_ISSET(dbc, DBC_OPD) ||
294             dbc->dbtype == DB_RECNO || F_ISSET(dbp, DB_AM_RECNUM)) {
295                 F_SET(cp, C_RECNUM);
296
297                 /*
298                  * All btrees that support record numbers, optionally standard
299                  * recno trees, and all off-page duplicate recno trees have
300                  * mutable record numbers.
301                  */
302                 if ((F_ISSET(dbc, DBC_OPD) && dbc->dbtype == DB_RECNO) ||
303                     F_ISSET(dbp, DB_AM_RECNUM | DB_AM_RENUMBER))
304                         F_SET(cp, C_RENUMBER);
305         }
306
307         return (0);
308 }
309
310 /*
311  * __bamc_close --
312  *      Close down the cursor.
313  */
314 static int
315 __bamc_close(dbc, root_pgno, rmroot)
316         DBC *dbc;
317         db_pgno_t root_pgno;
318         int *rmroot;
319 {
320         BTREE_CURSOR *cp, *cp_opd, *cp_c;
321         DB *dbp;
322         DBC *dbc_opd, *dbc_c;
323         DB_MPOOLFILE *mpf;
324         ENV *env;
325         PAGE *h;
326         int cdb_lock, ret;
327         u_int32_t count;
328
329         dbp = dbc->dbp;
330         env = dbp->env;
331         mpf = dbp->mpf;
332         cp = (BTREE_CURSOR *)dbc->internal;
333         cp_opd = (dbc_opd = cp->opd) == NULL ?
334             NULL : (BTREE_CURSOR *)dbc_opd->internal;
335         cdb_lock = ret = 0;
336
337         /*
338          * There are 3 ways this function is called:
339          *
340          * 1. Closing a primary cursor: we get called with a pointer to a
341          *    primary cursor that has a NULL opd field.  This happens when
342          *    closing a btree/recno database cursor without an associated
343          *    off-page duplicate tree.
344          *
345          * 2. Closing a primary and an off-page duplicate cursor stack: we
346          *    get called with a pointer to the primary cursor which has a
347          *    non-NULL opd field.  This happens when closing a btree cursor
348          *    into database with an associated off-page btree/recno duplicate
349          *    tree. (It can't be a primary recno database, recno databases
350          *    don't support duplicates.)
351          *
352          * 3. Closing an off-page duplicate cursor stack: we get called with
353          *    a pointer to the off-page duplicate cursor.  This happens when
354          *    closing a non-btree database that has an associated off-page
355          *    btree/recno duplicate tree or for a btree database when the
356          *    opd tree is not empty (root_pgno == PGNO_INVALID).
357          *
358          * If either the primary or off-page duplicate cursor deleted a btree
359          * key/data pair, check to see if the item is still referenced by a
360          * different cursor.  If it is, confirm that cursor's delete flag is
361          * set and leave it to that cursor to do the delete.
362          *
363          * NB: The test for == 0 below is correct.  Our caller already removed
364          * our cursor argument from the active queue, we won't find it when we
365          * search the queue in __bam_ca_delete().
366          * NB: It can't be true that both the primary and off-page duplicate
367          * cursors have deleted a btree key/data pair.  Either the primary
368          * cursor may have deleted an item and there's no off-page duplicate
369          * cursor, or there's an off-page duplicate cursor and it may have
370          * deleted an item.
371          *
372          * Primary recno databases aren't an issue here.  Recno keys are either
373          * deleted immediately or never deleted, and do not have to be handled
374          * here.
375          *
376          * Off-page duplicate recno databases are an issue here, cases #2 and
377          * #3 above can both be off-page recno databases.  The problem is the
378          * same as the final problem for off-page duplicate btree databases.
379          * If we no longer need the off-page duplicate tree, we want to remove
380          * it.  For off-page duplicate btrees, we are done with the tree when
381          * we delete the last item it contains, i.e., there can be no further
382          * references to it when it's empty.  For off-page duplicate recnos,
383          * we remove items from the tree as the application calls the remove
384          * function, so we are done with the tree when we close the last cursor
385          * that references it.
386          *
387          * We optionally take the root page number from our caller.  If the
388          * primary database is a btree, we can get it ourselves because dbc
389          * is the primary cursor.  If the primary database is not a btree,
390          * the problem is that we may be dealing with a stack of pages.  The
391          * cursor we're using to do the delete points at the bottom of that
392          * stack and we need the top of the stack.
393          */
394         if (F_ISSET(cp, C_DELETED)) {
395                 dbc_c = dbc;
396                 switch (dbc->dbtype) {
397                 case DB_BTREE:                          /* Case #1, #3. */
398                         if ((ret = __bam_ca_delete(
399                             dbp, cp->pgno, cp->indx, 1, &count)) != 0)
400                                 goto err;
401                         if (count == 0)
402                                 goto lock;
403                         goto done;
404                 case DB_RECNO:
405                         if (!F_ISSET(dbc, DBC_OPD))     /* Case #1. */
406                                 goto done;
407                                                         /* Case #3. */
408                         if ((ret = __ram_ca_delete(dbp, cp->root, &count)) != 0)
409                                 goto err;
410                         if (count == 0)
411                                 goto lock;
412                         goto done;
413                 case DB_HASH:
414                 case DB_QUEUE:
415                 case DB_UNKNOWN:
416                 default:
417                         ret = __db_unknown_type(
418                             env, "DbCursor.close", dbc->dbtype);
419                         goto err;
420                 }
421         }
422
423         if (dbc_opd == NULL)
424                 goto done;
425
426         if (F_ISSET(cp_opd, C_DELETED)) {               /* Case #2. */
427                 /*
428                  * We will not have been provided a root page number.  Acquire
429                  * one from the primary database.
430                  */
431                 if ((h = cp->page) == NULL && (ret = __memp_fget(mpf, &cp->pgno,
432                      dbc->thread_info, dbc->txn, 0, &h)) != 0)
433                         goto err;
434                 root_pgno = GET_BOVERFLOW(dbp, h, cp->indx + O_INDX)->pgno;
435                 if ((ret = __memp_fput(mpf,
436                      dbc->thread_info, h, dbc->priority)) != 0)
437                         goto err;
438                 cp->page = NULL;
439
440                 dbc_c = dbc_opd;
441                 switch (dbc_opd->dbtype) {
442                 case DB_BTREE:
443                         if ((ret = __bam_ca_delete(
444                             dbp, cp_opd->pgno, cp_opd->indx, 1, &count)) != 0)
445                                 goto err;
446                         if (count == 0)
447                                 goto lock;
448                         goto done;
449                 case DB_RECNO:
450                         if ((ret =
451                             __ram_ca_delete(dbp, cp_opd->root, &count)) != 0)
452                                 goto err;
453                         if (count == 0)
454                                 goto lock;
455                         goto done;
456                 case DB_HASH:
457                 case DB_QUEUE:
458                 case DB_UNKNOWN:
459                 default:
460                         ret = __db_unknown_type(
461                            env, "DbCursor.close", dbc->dbtype);
462                         goto err;
463                 }
464         }
465         goto done;
466
467 lock:   cp_c = (BTREE_CURSOR *)dbc_c->internal;
468
469         /*
470          * If this is CDB, upgrade the lock if necessary.  While we acquired
471          * the write lock to logically delete the record, we released it when
472          * we returned from that call, and so may not be holding a write lock
473          * at the moment.
474          */
475         if (CDB_LOCKING(env)) {
476                 if (F_ISSET(dbc, DBC_WRITECURSOR)) {
477                         if ((ret = __lock_get(env,
478                             dbc->locker, DB_LOCK_UPGRADE, &dbc->lock_dbt,
479                             DB_LOCK_WRITE, &dbc->mylock)) != 0)
480                                 goto err;
481                         cdb_lock = 1;
482                 }
483                 goto do_del;
484         }
485
486         /*
487          * The variable dbc_c has been initialized to reference the cursor in
488          * which we're going to do the delete.  Initialize the cursor's lock
489          * structures as necessary.
490          *
491          * First, we may not need to acquire any locks.  If we're in case #3,
492          * that is, the primary database isn't a btree database, our caller
493          * is responsible for acquiring any necessary locks before calling us.
494          */
495         if (F_ISSET(dbc, DBC_OPD))
496                 goto do_del;
497
498         /*
499          * Otherwise, acquire a write lock on the primary database's page.
500          *
501          * Lock the primary database page, regardless of whether we're deleting
502          * an item on a primary database page or an off-page duplicates page.
503          *
504          * If the cursor that did the initial logical deletion (and had a write
505          * lock) is not the same cursor doing the physical deletion (which may
506          * have only ever had a read lock on the item), we need to upgrade to a
507          * write lock.  The confusion comes as follows:
508          *
509          *      C1      created, acquires item read lock
510          *      C2      dup C1, create C2, also has item read lock.
511          *      C1      acquire write lock, delete item
512          *      C1      close
513          *      C2      close, needs a write lock to physically delete item.
514          *
515          * If we're in a TXN, we know that C2 will be able to acquire the write
516          * lock, because no locker other than the one shared by C1 and C2 can
517          * acquire a write lock -- the original write lock C1 acquired was never
518          * discarded.
519          *
520          * If we're not in a TXN, it's nastier.  Other cursors might acquire
521          * read locks on the item after C1 closed, discarding its write lock,
522          * and such locks would prevent C2 from acquiring a read lock.  That's
523          * OK, though, we'll simply wait until we can acquire a write lock, or
524          * we'll deadlock.  (Which better not happen, since we're not in a TXN.)
525          *
526          * There are similar scenarios with dirty reads, where the cursor may
527          * have downgraded its write lock to a was-write lock.
528          */
529         if (STD_LOCKING(dbc))
530                 if ((ret = __db_lget(dbc,
531                     LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
532                         goto err;
533
534 do_del: /*
535          * If the delete occurred in a Btree, we're going to look at the page
536          * to see if the item has to be physically deleted.  Otherwise, we do
537          * not need the actual page (and it may not even exist, it might have
538          * been truncated from the file after an allocation aborted).
539          *
540          * Delete the on-page physical item referenced by the cursor.
541          */
542         if (F_ISSET(dbc_c, DBC_OPD))
543                 LOCK_CHECK_OFF(dbc_c->thread_info);
544         if (dbc_c->dbtype == DB_BTREE) {
545                 if ((ret = __memp_fget(mpf, &cp_c->pgno, dbc->thread_info,
546                     dbc->txn, DB_MPOOL_DIRTY, &cp_c->page)) != 0)
547                         goto err_c;
548                 if ((ret = __bamc_physdel(dbc_c)) != 0)
549                         goto err_c;
550         }
551
552         /*
553          * If we're not working in an off-page duplicate tree, then we're
554          * done.
555          */
556         if (!F_ISSET(dbc_c, DBC_OPD) || root_pgno == PGNO_INVALID)
557                 goto done;
558
559         /*
560          * We may have just deleted the last element in the off-page duplicate
561          * tree, and closed the last cursor in the tree.  For an off-page btree
562          * there are no other cursors in the tree by definition, if the tree is
563          * empty.  For an off-page recno we know we have closed the last cursor
564          * in the tree because the __ram_ca_delete call above returned 0 only
565          * in that case.  So, if the off-page duplicate tree is empty at this
566          * point, we want to remove it.
567          */
568         if (((h = dbc_c->internal->page) == NULL || h->pgno != root_pgno) &&
569             (ret = __memp_fget(mpf,
570             &root_pgno, dbc->thread_info, dbc->txn, 0, &h)) != 0)
571                 goto err_c;
572         if ((count = NUM_ENT(h)) == 0) {
573                 if (h != dbc_c->internal->page)
574                         DISCARD_CUR(dbc_c, ret);
575                 else
576                         dbc_c->internal->page = NULL;
577                 if (ret == 0)
578                         ret = __db_free(dbc, h, 0);
579         } else if (h != dbc_c->internal->page)
580                 ret = __memp_fput(mpf, dbc->thread_info, h, dbc->priority);
581
582 err_c:  if (F_ISSET(dbc_c, DBC_OPD))
583                 LOCK_CHECK_ON(dbc_c->thread_info);
584         if (ret != 0)
585                 goto err;
586
587         if (count != 0)
588                 goto done;
589
590         /*
591          * When removing the tree, we have to do one of two things.  If this is
592          * case #2, that is, the primary tree is a btree, delete the key that's
593          * associated with the tree from the btree leaf page.  We know we are
594          * the only reference to it and we already have the correct lock.  We
595          * detect this case because the cursor that was passed to us references
596          * an off-page duplicate cursor.
597          *
598          * If this is case #3, that is, the primary tree isn't a btree, pass
599          * the information back to our caller, it's their job to do cleanup on
600          * the primary page.
601          */
602         if (dbc_opd != NULL) {
603                 if ((ret = __memp_fget(mpf, &cp->pgno, dbc->thread_info,
604                     dbc->txn, DB_MPOOL_DIRTY, &cp->page)) != 0)
605                         goto err;
606                 if ((ret = __bamc_physdel(dbc)) != 0)
607                         goto err;
608         } else
609                 *rmroot = 1;
610 err:
611 done:   /*
612          * Discard the page references and locks, and confirm that the stack
613          * has been emptied.
614          */
615         if (dbc_opd != NULL)
616                 DISCARD_CUR(dbc_opd, ret);
617         DISCARD_CUR(dbc, ret);
618
619         /* Downgrade any CDB lock we acquired. */
620         if (cdb_lock)
621                 (void)__lock_downgrade(env, &dbc->mylock, DB_LOCK_IWRITE, 0);
622
623         return (ret);
624 }
625
626 /*
627  * __bamc_cmp --
628  *      Compare two btree cursors for equality.
629  *
630  * This function is only called with two cursors that point to the same item.
631  * It only distinguishes cursors pointing to deleted and undeleted items at
632  * the same location.
633  *
634  * PUBLIC: int __bamc_cmp __P((DBC *, DBC *, int *));
635  */
636 int
637 __bamc_cmp(dbc, other_dbc, result)
638         DBC *dbc, *other_dbc;
639         int *result;
640 {
641         ENV *env;
642         BTREE_CURSOR *bcp, *obcp;
643
644         env = dbc->env;
645         bcp = (BTREE_CURSOR *)dbc->internal;
646         obcp = (BTREE_CURSOR *)other_dbc->internal;
647
648         DB_ASSERT (env, bcp->pgno == obcp->pgno);
649         DB_ASSERT (env, bcp->indx == obcp->indx);
650
651         /* Check to see if both cursors have the same deleted flag. */
652         *result =
653             ((F_ISSET(bcp, C_DELETED)) == F_ISSET(obcp, C_DELETED)) ? 0 : 1;
654         return (0);
655 }
656
657 /*
658  * __bamc_destroy --
659  *      Close a single cursor -- internal version.
660  */
661 static int
662 __bamc_destroy(dbc)
663         DBC *dbc;
664 {
665         BTREE_CURSOR *cp;
666         ENV *env;
667
668         cp = (BTREE_CURSOR *)dbc->internal;
669         env = dbc->env;
670
671         /* Discard the structures. */
672         if (cp->sp != cp->stack)
673                 __os_free(env, cp->sp);
674
675 #ifdef HAVE_COMPRESSION
676         /* Free the memory used for compression */
677         __os_free(env, cp->compressed.data);
678         __os_free(env, cp->key1.data);
679         __os_free(env, cp->key2.data);
680         __os_free(env, cp->data1.data);
681         __os_free(env, cp->data2.data);
682         __os_free(env, cp->del_key.data);
683         __os_free(env, cp->del_data.data);
684 #endif
685
686         __os_free(env, cp);
687
688         return (0);
689 }
690
691 /*
692  * __bamc_count --
693  *      Return a count of on and off-page duplicates.
694  *
695  * PUBLIC: int __bamc_count __P((DBC *, db_recno_t *));
696  */
697 int
698 __bamc_count(dbc, recnop)
699         DBC *dbc;
700         db_recno_t *recnop;
701 {
702         BTREE_CURSOR *cp;
703         DB *dbp;
704         DB_MPOOLFILE *mpf;
705         db_indx_t indx, top;
706         db_recno_t recno;
707         int ret;
708
709         dbp = dbc->dbp;
710         mpf = dbp->mpf;
711         cp = (BTREE_CURSOR *)dbc->internal;
712
713         /*
714          * Called with the top-level cursor that may reference an off-page
715          * duplicates tree.  We don't have to acquire any new locks, we have
716          * to have a read lock to even get here.
717          */
718         if (cp->opd == NULL) {
719                 /*
720                  * On-page duplicates, get the page and count.
721                  */
722                 DB_ASSERT(dbp->env, cp->page == NULL);
723                 if ((ret = __memp_fget(mpf, &cp->pgno,
724                     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
725                         return (ret);
726
727                 /*
728                  * Move back to the beginning of the set of duplicates and
729                  * then count forward.
730                  */
731                 for (indx = cp->indx;; indx -= P_INDX)
732                         if (indx == 0 ||
733                             !IS_DUPLICATE(dbc, indx, indx - P_INDX))
734                                 break;
735                 for (recno = 0,
736                     top = NUM_ENT(cp->page) - P_INDX;; indx += P_INDX) {
737                         if (!IS_DELETED(dbp, cp->page, indx))
738                                 ++recno;
739                         if (indx == top ||
740                             !IS_DUPLICATE(dbc, indx, indx + P_INDX))
741                                 break;
742                 }
743         } else {
744                 /*
745                  * Off-page duplicates tree, get the root page of the off-page
746                  * duplicate tree.
747                  */
748                 if ((ret = __memp_fget(mpf, &cp->opd->internal->root,
749                     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
750                         return (ret);
751
752                 /*
753                  * If the page is an internal page use the page's count as it's
754                  * up-to-date and reflects the status of cursors in the tree.
755                  * If the page is a leaf page for unsorted duplicates, use the
756                  * page's count as cursors don't mark items deleted on the page
757                  * and wait, cursor delete items immediately.
758                  * If the page is a leaf page for sorted duplicates, there may
759                  * be cursors on the page marking deleted items -- count.
760                  */
761                 if (TYPE(cp->page) == P_LDUP)
762                         for (recno = 0, indx = 0,
763                             top = NUM_ENT(cp->page) - O_INDX;; indx += O_INDX) {
764                                 if (!IS_DELETED(dbp, cp->page, indx))
765                                         ++recno;
766                                 if (indx == top)
767                                         break;
768                         }
769                 else
770                         recno = RE_NREC(cp->page);
771         }
772
773         *recnop = recno;
774
775         ret = __memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority);
776         cp->page = NULL;
777
778         return (ret);
779 }
780
781 /*
782  * __bamc_del --
783  *      Delete using a cursor.
784  */
785 static int
786 __bamc_del(dbc, flags)
787         DBC *dbc;
788         u_int32_t flags;
789 {
790         BTREE_CURSOR *cp;
791         DB *dbp;
792         DB_MPOOLFILE *mpf;
793         int ret, t_ret;
794         u_int32_t count;
795
796         dbp = dbc->dbp;
797         mpf = dbp->mpf;
798         cp = (BTREE_CURSOR *)dbc->internal;
799         ret = 0;
800         COMPQUIET(flags, 0);
801
802         /* If the item was already deleted, return failure. */
803         if (F_ISSET(cp, C_DELETED))
804                 return (DB_KEYEMPTY);
805
806         /*
807          * This code is always called with a page lock but no page.
808          */
809         DB_ASSERT(dbp->env, cp->page == NULL);
810
811         if (F_ISSET(dbc, DBC_OPD))
812                 LOCK_CHECK_OFF(dbc->thread_info);
813
814         /*
815          * We don't physically delete the record until the cursor moves, so
816          * we have to have a long-lived write lock on the page instead of a
817          * a long-lived read lock.  Note, we have to have a read lock to even
818          * get here.
819          *
820          * If we're maintaining record numbers, we lock the entire tree, else
821          * we lock the single page.
822          */
823         if (F_ISSET(cp, C_RECNUM)) {
824                 if ((ret = __bamc_getstack(dbc)) != 0)
825                         goto err;
826                 cp->page = cp->csp->page;
827         } else {
828                 ACQUIRE_CUR(dbc, DB_LOCK_WRITE, cp->pgno, 0, ret);
829                 if (ret != 0)
830                         goto err;
831         }
832
833         /* Mark the page dirty. */
834         if ((ret = __memp_dirty(mpf,
835             &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
836                 goto err;
837
838         /* Log the change. */
839         if (DBC_LOGGING(dbc)) {
840                 if ((ret = __bam_cdel_log(dbp, dbc->txn, &LSN(cp->page), 0,
841                     PGNO(cp->page), &LSN(cp->page), cp->indx)) != 0)
842                         goto err;
843         } else
844                 LSN_NOT_LOGGED(LSN(cp->page));
845
846         /* Set the intent-to-delete flag on the page. */
847         if (TYPE(cp->page) == P_LBTREE)
848                 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx + O_INDX)->type);
849         else
850                 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type);
851
852 err:    /*
853          * If we've been successful so far and the tree has record numbers,
854          * adjust the record counts.  Either way, release acquired page(s).
855          */
856         if (F_ISSET(cp, C_RECNUM)) {
857                 cp->csp->page = cp->page;
858                 if (ret == 0)
859                         ret = __bam_adjust(dbc, -1);
860                 (void)__bam_stkrel(dbc, 0);
861         } else
862                 if (cp->page != NULL &&
863                     (t_ret = __memp_fput(mpf, dbc->thread_info,
864                     cp->page, dbc->priority)) != 0 && ret == 0)
865                         ret = t_ret;
866
867         cp->page = NULL;
868
869         /*
870          * Update the cursors last, after all chance of recoverable failure
871          * is past.
872          */
873         if (ret == 0)
874                 ret = __bam_ca_delete(dbp, cp->pgno, cp->indx, 1, &count);
875
876         if (F_ISSET(dbc, DBC_OPD))
877                 LOCK_CHECK_ON(dbc->thread_info);
878         return (ret);
879 }
880
881 /*
882  * __bamc_dup --
883  *      Duplicate a btree cursor, such that the new one holds appropriate
884  *      locks for the position of the original.
885  *
886  * PUBLIC: int __bamc_dup __P((DBC *, DBC *, u_int32_t));
887  */
888 int
889 __bamc_dup(orig_dbc, new_dbc, flags)
890         DBC *orig_dbc, *new_dbc;
891         u_int32_t flags;
892 {
893         BTREE_CURSOR *orig, *new;
894
895         orig = (BTREE_CURSOR *)orig_dbc->internal;
896         new = (BTREE_CURSOR *)new_dbc->internal;
897
898         new->ovflsize = orig->ovflsize;
899         new->recno = orig->recno;
900         new->flags = orig->flags;
901
902 #ifdef HAVE_COMPRESSION
903         /* Copy the compression state */
904         return (__bamc_compress_dup(orig_dbc, new_dbc, flags));
905 #else
906         COMPQUIET(flags, 0);
907
908         return (0);
909 #endif
910 }
911
912 /*
913  * __bamc_get --
914  *      Get using a cursor (btree).
915  */
916 static int
917 __bamc_get(dbc, key, data, flags, pgnop)
918         DBC *dbc;
919         DBT *key, *data;
920         u_int32_t flags;
921         db_pgno_t *pgnop;
922 {
923         BTREE_CURSOR *cp;
924         DB *dbp;
925         DB_MPOOLFILE *mpf;
926         db_pgno_t orig_pgno;
927         db_indx_t orig_indx;
928         int exact, newopd, ret;
929
930         dbp = dbc->dbp;
931         mpf = dbp->mpf;
932         cp = (BTREE_CURSOR *)dbc->internal;
933         orig_pgno = cp->pgno;
934         orig_indx = cp->indx;
935
936         newopd = 0;
937         switch (flags) {
938         case DB_CURRENT:
939                 /* It's not possible to return a deleted record. */
940                 if (F_ISSET(cp, C_DELETED)) {
941                         ret = DB_KEYEMPTY;
942                         goto err;
943                 }
944
945                 /*
946                  * Acquire the current page.  We have at least a read-lock
947                  * already.  The caller may have set DB_RMW asking for a
948                  * write lock, but upgrading to a write lock has no better
949                  * chance of succeeding now instead of later, so don't try.
950                  */
951                 if ((ret = __memp_fget(mpf, &cp->pgno,
952                     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
953                         goto err;
954                 break;
955         case DB_FIRST:
956                 newopd = 1;
957                 if ((ret = __bamc_search(dbc,
958                      PGNO_INVALID, NULL, flags, &exact)) != 0)
959                         goto err;
960                 break;
961         case DB_GET_BOTH:
962         case DB_GET_BOTH_RANGE:
963                 /*
964                  * There are two ways to get here based on DBcursor->get
965                  * with the DB_GET_BOTH/DB_GET_BOTH_RANGE flags set:
966                  *
967                  * 1. Searching a sorted off-page duplicate tree: do a tree
968                  * search.
969                  *
970                  * 2. Searching btree: do a tree search.  If it returns a
971                  * reference to off-page duplicate tree, return immediately
972                  * and let our caller deal with it.  If the search doesn't
973                  * return a reference to off-page duplicate tree, continue
974                  * with an on-page search.
975                  */
976                 if (F_ISSET(dbc, DBC_OPD)) {
977                         if ((ret = __bamc_search(
978                             dbc, PGNO_INVALID, data, flags, &exact)) != 0)
979                                 goto err;
980                         if (flags == DB_GET_BOTH) {
981                                 if (!exact) {
982                                         ret = DB_NOTFOUND;
983                                         goto err;
984                                 }
985                                 break;
986                         }
987
988                         /*
989                          * We didn't require an exact match, so the search may
990                          * may have returned an entry past the end of the page,
991                          * or we may be referencing a deleted record.  If so,
992                          * move to the next entry.
993                          */
994                         if ((cp->indx == NUM_ENT(cp->page) ||
995                             IS_CUR_DELETED(dbc)) &&
996                             (ret = __bamc_next(dbc, 1, 0)) != 0)
997                                 goto err;
998                 } else {
999                         if ((ret = __bamc_search(
1000                             dbc, PGNO_INVALID, key, flags, &exact)) != 0)
1001                                 return (ret);
1002                         if (!exact) {
1003                                 ret = DB_NOTFOUND;
1004                                 goto err;
1005                         }
1006
1007                         if (pgnop != NULL && __bam_isopd(dbc, pgnop)) {
1008                                 newopd = 1;
1009                                 break;
1010                         }
1011                         if ((ret =
1012                             __bam_getboth_finddatum(dbc, data, flags)) != 0)
1013                                 goto err;
1014                 }
1015                 break;
1016 #ifdef HAVE_COMPRESSION
1017         case DB_SET_LTE:
1018                 if ((ret = __bam_getlte(dbc, key, NULL)) != 0)
1019                         goto err;
1020                 break;
1021         case DB_GET_BOTH_LTE:
1022                 if ((ret = __bam_getlte(dbc, key, data)) != 0)
1023                         goto err;
1024                 break;
1025 #endif
1026         case DB_GET_BOTHC:
1027                 if ((ret = __bam_getbothc(dbc, data)) != 0)
1028                         goto err;
1029                 break;
1030         case DB_LAST:
1031                 newopd = 1;
1032                 if ((ret = __bamc_search(dbc,
1033                      PGNO_INVALID, NULL, flags, &exact)) != 0)
1034                         goto err;
1035                 break;
1036         case DB_NEXT:
1037                 newopd = 1;
1038                 if (cp->pgno == PGNO_INVALID) {
1039                         if ((ret = __bamc_search(dbc,
1040                              PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0)
1041                                 goto err;
1042                 } else
1043                         if ((ret = __bamc_next(dbc, 1, 0)) != 0)
1044                                 goto err;
1045                 break;
1046         case DB_NEXT_DUP:
1047                 if ((ret = __bamc_next(dbc, 1, 0)) != 0)
1048                         goto err;
1049                 if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
1050                         ret = DB_NOTFOUND;
1051                         goto err;
1052                 }
1053                 break;
1054         case DB_NEXT_NODUP:
1055                 newopd = 1;
1056                 if (cp->pgno == PGNO_INVALID) {
1057                         if ((ret = __bamc_search(dbc,
1058                              PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0)
1059                                 goto err;
1060                 } else
1061                         do {
1062                                 if ((ret = __bamc_next(dbc, 1, 0)) != 0)
1063                                         goto err;
1064                         } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
1065                 break;
1066         case DB_PREV:
1067                 newopd = 1;
1068                 if (cp->pgno == PGNO_INVALID) {
1069                         if ((ret = __bamc_search(dbc,
1070                              PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
1071                                 goto err;
1072                 } else
1073                         if ((ret = __bamc_prev(dbc)) != 0)
1074                                 goto err;
1075                 break;
1076         case DB_PREV_DUP:
1077                 if ((ret = __bamc_prev(dbc)) != 0)
1078                         goto err;
1079                 if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
1080                         ret = DB_NOTFOUND;
1081                         goto err;
1082                 }
1083                 break;
1084         case DB_PREV_NODUP:
1085                 newopd = 1;
1086                 if (cp->pgno == PGNO_INVALID) {
1087                         if ((ret = __bamc_search(dbc,
1088                              PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
1089                                 goto err;
1090                 } else
1091                         do {
1092                                 if ((ret = __bamc_prev(dbc)) != 0)
1093                                         goto err;
1094                         } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
1095                 break;
1096         case DB_SET:
1097         case DB_SET_RECNO:
1098                 newopd = 1;
1099                 if ((ret = __bamc_search(dbc,
1100                     PGNO_INVALID, key, flags, &exact)) != 0)
1101                         goto err;
1102                 break;
1103         case DB_SET_RANGE:
1104                 newopd = 1;
1105                 if ((ret = __bamc_search(dbc,
1106                     PGNO_INVALID, key, flags, &exact)) != 0)
1107                         goto err;
1108
1109                 /*
1110                  * As we didn't require an exact match, the search function
1111                  * may have returned an entry past the end of the page.  Or,
1112                  * we may be referencing a deleted record.  If so, move to
1113                  * the next entry.
1114                  */
1115                 if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc))
1116                         if ((ret = __bamc_next(dbc, 0, 0)) != 0)
1117                                 goto err;
1118                 break;
1119         default:
1120                 ret = __db_unknown_flag(dbp->env, "__bamc_get", flags);
1121                 goto err;
1122         }
1123
1124         /*
1125          * We may have moved to an off-page duplicate tree.  Return that
1126          * information to our caller.
1127          */
1128         if (newopd && pgnop != NULL)
1129                 (void)__bam_isopd(dbc, pgnop);
1130
1131 err:    /*
1132          * Regardless of whether we were successful or not, if the cursor
1133          * moved, clear the delete flag, DBcursor->get never references a
1134          * deleted key, if it moved at all.
1135          */
1136         if (F_ISSET(cp, C_DELETED) &&
1137             (cp->pgno != orig_pgno || cp->indx != orig_indx))
1138                 F_CLR(cp, C_DELETED);
1139
1140         return (ret);
1141 }
1142
1143 static int
1144 __bam_get_prev(dbc)
1145         DBC *dbc;
1146 {
1147         BTREE_CURSOR *cp;
1148         DBT key, data;
1149         db_pgno_t pgno;
1150         int ret;
1151
1152         if ((ret = __bamc_prev(dbc)) != 0)
1153                 return (ret);
1154
1155         if (__bam_isopd(dbc, &pgno)) {
1156                 cp = (BTREE_CURSOR *)dbc->internal;
1157                 if ((ret = __dbc_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
1158                         return (ret);
1159                 if ((ret = cp->opd->am_get(cp->opd,
1160                     &key, &data, DB_LAST, NULL)) != 0)
1161                         return (ret);
1162         }
1163
1164         return (0);
1165 }
1166
1167 /*
1168  * __bam_bulk -- Return bulk data from a btree.
1169  */
1170 static int
1171 __bam_bulk(dbc, data, flags)
1172         DBC *dbc;
1173         DBT *data;
1174         u_int32_t flags;
1175 {
1176         BKEYDATA *bk;
1177         BOVERFLOW *bo;
1178         BTREE_CURSOR *cp;
1179         PAGE *pg;
1180         db_indx_t *inp, indx, pg_keyoff;
1181         int32_t  *endp, key_off, *offp, *saveoffp;
1182         u_int8_t *dbuf, *dp, *np;
1183         u_int32_t key_size, pagesize, size, space;
1184         int adj, is_key, need_pg, next_key, no_dup, rec_key, ret;
1185
1186         ret = 0;
1187         key_off = 0;
1188         size = 0;
1189         pagesize = dbc->dbp->pgsize;
1190         cp = (BTREE_CURSOR *)dbc->internal;
1191
1192         /*
1193          * dp tracks the beginning of the page in the buffer.
1194          * np is the next place to copy things into the buffer.
1195          * dbuf always stays at the beginning of the buffer.
1196          */
1197         dbuf = data->data;
1198         np = dp = dbuf;
1199
1200         /* Keep track of space that is left.  There is a termination entry */
1201         space = data->ulen;
1202         space -= sizeof(*offp);
1203
1204         /* Build the offset/size table from the end up. */
1205         endp = (int32_t *)((u_int8_t *)dbuf + data->ulen);
1206         endp--;
1207         offp = endp;
1208
1209         key_size = 0;
1210
1211         /*
1212          * Distinguish between BTREE and RECNO.
1213          * There are no keys in RECNO.  If MULTIPLE_KEY is specified
1214          * then we return the record numbers.
1215          * is_key indicates that multiple btree keys are returned.
1216          * rec_key is set if we are returning record numbers.
1217          * next_key is set if we are going after the next key rather than dup.
1218          */
1219         if (dbc->dbtype == DB_BTREE) {
1220                 is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1: 0;
1221                 rec_key = 0;
1222                 next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1223                 adj = 2;
1224         } else {
1225                 is_key = 0;
1226                 rec_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1227                 next_key = LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1228                 adj = 1;
1229         }
1230         no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP;
1231
1232 next_pg:
1233         indx = cp->indx;
1234         pg = cp->page;
1235
1236         inp = P_INP(dbc->dbp, pg);
1237         /* The current page is not yet in the buffer. */
1238         need_pg = 1;
1239
1240         /*
1241          * Keep track of the offset of the current key on the page.
1242          * If we are returning keys, set it to 0 first so we force
1243          * the copy of the key to the buffer.
1244          */
1245         pg_keyoff = 0;
1246         if (is_key == 0)
1247                 pg_keyoff = inp[indx];
1248
1249         do {
1250                 if (IS_DELETED(dbc->dbp, pg, indx)) {
1251                         if (dbc->dbtype != DB_RECNO)
1252                                 continue;
1253
1254                         cp->recno++;
1255                         /*
1256                          * If we are not returning recnos then we
1257                          * need to fill in every slot so the user
1258                          * can calculate the record numbers.
1259                          */
1260                         if (rec_key != 0)
1261                                 continue;
1262
1263                         space -= 2 * sizeof(*offp);
1264                         /* Check if space as underflowed. */
1265                         if (space > data->ulen)
1266                                 goto back_up;
1267
1268                         /* Just mark the empty recno slots. */
1269                         *offp-- = 0;
1270                         *offp-- = 0;
1271                         continue;
1272                 }
1273
1274                 /*
1275                  * Check to see if we have a new key.
1276                  * If so, then see if we need to put the
1277                  * key on the page.  If its already there
1278                  * then we just point to it.
1279                  */
1280                 if (is_key && pg_keyoff != inp[indx]) {
1281                         bk = GET_BKEYDATA(dbc->dbp, pg, indx);
1282                         if (B_TYPE(bk->type) == B_OVERFLOW) {
1283                                 bo = (BOVERFLOW *)bk;
1284                                 size = key_size = bo->tlen;
1285                                 if (key_size > space)
1286                                         goto get_key_space;
1287                                 if ((ret = __bam_bulk_overflow(dbc,
1288                                     bo->tlen, bo->pgno, np)) != 0)
1289                                         return (ret);
1290                                 space -= key_size;
1291                                 key_off = (int32_t)(np - dbuf);
1292                                 np += key_size;
1293                         } else {
1294                                 if (need_pg) {
1295                                         dp = np;
1296                                         size = pagesize - HOFFSET(pg);
1297                                         if (space < size) {
1298 get_key_space:
1299                                                 /* Nothing added, then error. */
1300                                                 if (offp == endp) {
1301                                                         data->size = (u_int32_t)
1302                                                             DB_ALIGN(size +
1303                                                             pagesize, 1024);
1304                                                         return
1305                                                             (DB_BUFFER_SMALL);
1306                                                 }
1307                                                 /*
1308                                                  * We need to back up to the
1309                                                  * last record put into the
1310                                                  * buffer so that it is
1311                                                  * CURRENT.
1312                                                  */
1313                                                 if (indx != 0)
1314                                                         indx -= P_INDX;
1315                                                 else {
1316                                                         if ((ret =
1317                                                             __bam_get_prev(
1318                                                             dbc)) != 0)
1319                                                                 return (ret);
1320                                                         indx = cp->indx;
1321                                                         pg = cp->page;
1322                                                 }
1323                                                 break;
1324                                         }
1325                                         /*
1326                                          * Move the data part of the page
1327                                          * to the buffer.
1328                                          */
1329                                         memcpy(dp,
1330                                            (u_int8_t *)pg + HOFFSET(pg), size);
1331                                         need_pg = 0;
1332                                         space -= size;
1333                                         np += size;
1334                                 }
1335                                 key_size = bk->len;
1336                                 key_off = (int32_t)((inp[indx] - HOFFSET(pg))
1337                                     + (dp - dbuf) + SSZA(BKEYDATA, data));
1338                                 pg_keyoff = inp[indx];
1339                         }
1340                 }
1341
1342                 /*
1343                  * Reserve space for the pointers and sizes.
1344                  * Either key/data pair or just for a data item.
1345                  */
1346                 space -= (is_key ? 4 : 2) * sizeof(*offp);
1347                 if (rec_key)
1348                         space -= sizeof(*offp);
1349
1350                 /* Check to see if space has underflowed. */
1351                 if (space > data->ulen)
1352                         goto back_up;
1353
1354                 /*
1355                  * Determine if the next record is in the
1356                  * buffer already or if it needs to be copied in.
1357                  * If we have an off page dup, then copy as many
1358                  * as will fit into the buffer.
1359                  */
1360                 bk = GET_BKEYDATA(dbc->dbp, pg, indx + adj - 1);
1361                 if (B_TYPE(bk->type) == B_DUPLICATE) {
1362                         bo = (BOVERFLOW *)bk;
1363                         if (is_key) {
1364                                 *offp-- = (int32_t)key_off;
1365                                 *offp-- = (int32_t)key_size;
1366                         }
1367                         /*
1368                          * We pass the offset of the current key.
1369                          * On return we check to see if offp has
1370                          * moved to see if any data fit.
1371                          */
1372                         saveoffp = offp;
1373                         if ((ret = __bam_bulk_duplicates(dbc, bo->pgno,
1374                             dbuf, is_key ? offp + P_INDX : NULL,
1375                             &offp, &np, &space, no_dup)) != 0) {
1376                                 if (ret == DB_BUFFER_SMALL) {
1377                                         size = space;
1378                                         space = 0;
1379                                         /* If nothing was added, then error. */
1380                                         if (offp == saveoffp) {
1381                                                 offp += 2;
1382                                                 goto back_up;
1383                                         }
1384                                         goto get_space;
1385                                 }
1386                                 return (ret);
1387                         }
1388                 } else if (B_TYPE(bk->type) == B_OVERFLOW) {
1389                         bo = (BOVERFLOW *)bk;
1390                         size = bo->tlen;
1391                         if (size > space)
1392                                 goto back_up;
1393                         if ((ret =
1394                             __bam_bulk_overflow(dbc,
1395                                 bo->tlen, bo->pgno, np)) != 0)
1396                                 return (ret);
1397                         space -= size;
1398                         if (is_key) {
1399                                 *offp-- = (int32_t)key_off;
1400                                 *offp-- = (int32_t)key_size;
1401                         } else if (rec_key)
1402                                 *offp-- = (int32_t)cp->recno;
1403                         *offp-- = (int32_t)(np - dbuf);
1404                         np += size;
1405                         *offp-- = (int32_t)size;
1406                 } else {
1407                         if (need_pg) {
1408                                 dp = np;
1409                                 size = pagesize - HOFFSET(pg);
1410                                 if (space < size) {
1411 back_up:
1412                                         /*
1413                                          * Back up the index so that the
1414                                          * last record in the buffer is CURRENT
1415                                          */
1416                                         if (indx >= adj)
1417                                                 indx -= adj;
1418                                         else {
1419                                                 if ((ret =
1420                                                     __bam_get_prev(dbc)) != 0 &&
1421                                                     ret != DB_NOTFOUND)
1422                                                         return (ret);
1423                                                 indx = cp->indx;
1424                                                 pg = cp->page;
1425                                         }
1426                                         if (dbc->dbtype == DB_RECNO)
1427                                                 cp->recno--;
1428 get_space:
1429                                         /*
1430                                          * See if we put anything in the
1431                                          * buffer or if we are doing a DBP->get
1432                                          * did we get all of the data.
1433                                          */
1434                                         if (offp >=
1435                                             (is_key ? &endp[-1] : endp) ||
1436                                             F_ISSET(dbc, DBC_FROM_DB_GET)) {
1437                                                 data->size = (u_int32_t)
1438                                                     DB_ALIGN(size +
1439                                                     data->ulen - space, 1024);
1440                                                 return (DB_BUFFER_SMALL);
1441                                         }
1442                                         break;
1443                                 }
1444                                 memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size);
1445                                 need_pg = 0;
1446                                 space -= size;
1447                                 np += size;
1448                         }
1449                         /*
1450                          * Add the offsets and sizes to the end of the buffer.
1451                          * First add the key info then the data info.
1452                          */
1453                         if (is_key) {
1454                                 *offp-- = (int32_t)key_off;
1455                                 *offp-- = (int32_t)key_size;
1456                         } else if (rec_key)
1457                                 *offp-- = (int32_t)cp->recno;
1458                         *offp-- = (int32_t)((inp[indx + adj - 1] - HOFFSET(pg))
1459                             + (dp - dbuf) + SSZA(BKEYDATA, data));
1460                         *offp-- = bk->len;
1461                 }
1462                 if (dbc->dbtype == DB_RECNO)
1463                         cp->recno++;
1464                 else if (no_dup) {
1465                         while (indx + adj < NUM_ENT(pg) &&
1466                             pg_keyoff == inp[indx + adj])
1467                                 indx += adj;
1468                 }
1469         /*
1470          * Stop when we either run off the page or we move to the next key and
1471          * we are not returning multiple keys.
1472          */
1473         } while ((indx += adj) < NUM_ENT(pg) &&
1474             (next_key || pg_keyoff == inp[indx]));
1475
1476         /* If we are off the page then try to the next page. */
1477         if (ret == 0 && next_key && indx >= NUM_ENT(pg)) {
1478                 cp->indx = indx;
1479                 ret = __bamc_next(dbc, 0, 1);
1480                 if (ret == 0)
1481                         goto next_pg;
1482                 if (ret != DB_NOTFOUND)
1483                         return (ret);
1484         }
1485
1486         /*
1487          * If we did a DBP->get we must error if we did not return
1488          * all the data for the current key because there is
1489          * no way to know if we did not get it all, nor any
1490          * interface to fetch the balance.
1491          */
1492
1493         if (ret == 0 && indx < pg->entries &&
1494             F_ISSET(dbc, DBC_TRANSIENT) && pg_keyoff == inp[indx]) {
1495                 data->size = (data->ulen - space) + size;
1496                 return (DB_BUFFER_SMALL);
1497         }
1498         /*
1499          * Must leave the index pointing at the last record fetched.
1500          * If we are not fetching keys, we may have stepped to the
1501          * next key.
1502          */
1503         if (ret == DB_BUFFER_SMALL || next_key || pg_keyoff == inp[indx])
1504                 cp->indx = indx;
1505         else
1506                 cp->indx = indx - P_INDX;
1507
1508         if (rec_key == 1)
1509                 *offp = RECNO_OOB;
1510         else
1511                 *offp = -1;
1512         return (0);
1513 }
1514
1515 /*
1516  * __bam_bulk_overflow --
1517  *      Dump overflow record into the buffer.
1518  *      The space requirements have already been checked.
1519  * PUBLIC: int __bam_bulk_overflow
1520  * PUBLIC:    __P((DBC *, u_int32_t, db_pgno_t, u_int8_t *));
1521  */
1522 int
1523 __bam_bulk_overflow(dbc, len, pgno, dp)
1524         DBC *dbc;
1525         u_int32_t len;
1526         db_pgno_t pgno;
1527         u_int8_t *dp;
1528 {
1529         DBT dbt;
1530
1531         memset(&dbt, 0, sizeof(dbt));
1532         F_SET(&dbt, DB_DBT_USERMEM);
1533         dbt.ulen = len;
1534         dbt.data = (void *)dp;
1535         return (__db_goff(dbc, &dbt, len, pgno, NULL, NULL));
1536 }
1537
1538 /*
1539  * __bam_bulk_duplicates --
1540  *      Put as many off page duplicates as will fit into the buffer.
1541  * This routine will adjust the cursor to reflect the position in
1542  * the overflow tree.
1543  * PUBLIC: int __bam_bulk_duplicates __P((DBC *,
1544  * PUBLIC:       db_pgno_t, u_int8_t *, int32_t *,
1545  * PUBLIC:       int32_t **, u_int8_t **, u_int32_t *, int));
1546  */
1547 int
1548 __bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup)
1549         DBC *dbc;
1550         db_pgno_t pgno;
1551         u_int8_t *dbuf;
1552         int32_t *keyoff, **offpp;
1553         u_int8_t **dpp;
1554         u_int32_t *spacep;
1555         int no_dup;
1556 {
1557         BKEYDATA *bk;
1558         BOVERFLOW *bo;
1559         BTREE_CURSOR *cp;
1560         DB *dbp;
1561         DBC *opd;
1562         DBT key, data;
1563         PAGE *pg;
1564         db_indx_t indx, *inp;
1565         int32_t *offp;
1566         u_int32_t pagesize, size, space;
1567         u_int8_t *dp, *np;
1568         int first, need_pg, ret, t_ret;
1569
1570         ret = 0;
1571
1572         dbp = dbc->dbp;
1573         cp = (BTREE_CURSOR *)dbc->internal;
1574         opd = cp->opd;
1575
1576         if (opd == NULL) {
1577                 if ((ret = __dbc_newopd(dbc, pgno, NULL, &opd)) != 0)
1578                         return (ret);
1579                 cp->opd = opd;
1580                 if ((ret = opd->am_get(opd,
1581                     &key, &data, DB_FIRST, NULL)) != 0)
1582                         goto close_opd;
1583         }
1584
1585         pagesize = opd->dbp->pgsize;
1586         cp = (BTREE_CURSOR *)opd->internal;
1587         space = *spacep;
1588         /* Get current offset slot. */
1589         offp = *offpp;
1590
1591         /*
1592          * np is the next place to put data.
1593          * dp is the beginning of the current page in the buffer.
1594          */
1595         np = dp = *dpp;
1596         first = 1;
1597         indx = cp->indx;
1598
1599         do {
1600                 /* Fetch the current record.  No initial move. */
1601                 if ((ret = __bamc_next(opd, 0, 0)) != 0)
1602                         break;
1603                 pg = cp->page;
1604                 indx = cp->indx;
1605                 inp = P_INP(dbp, pg);
1606                 /* We need to copy the page to the buffer. */
1607                 need_pg = 1;
1608
1609                 do {
1610                         if (IS_DELETED(dbp, pg, indx))
1611                                 goto contin;
1612                         bk = GET_BKEYDATA(dbp, pg, indx);
1613                         space -= 2 * sizeof(*offp);
1614                         /* Allocate space for key if needed. */
1615                         if (first == 0 && keyoff != NULL)
1616                                 space -= 2 * sizeof(*offp);
1617
1618                         /* Did space underflow? */
1619                         if (space > *spacep) {
1620                                 ret = DB_BUFFER_SMALL;
1621                                 if (first == 1) {
1622                                         /* Get the absolute value. */
1623                                         space = -(int32_t)space;
1624                                         space = *spacep + space;
1625                                         if (need_pg)
1626                                                 space += pagesize - HOFFSET(pg);
1627                                 }
1628                                 break;
1629                         }
1630                         if (B_TYPE(bk->type) == B_OVERFLOW) {
1631                                 bo = (BOVERFLOW *)bk;
1632                                 size = bo->tlen;
1633                                 if (size > space) {
1634                                         ret = DB_BUFFER_SMALL;
1635                                         space = *spacep + size;
1636                                         break;
1637                                 }
1638                                 if (first == 0 && keyoff != NULL) {
1639                                         *offp-- = keyoff[0];
1640                                         *offp-- = keyoff[-1];
1641                                 }
1642                                 if ((ret = __bam_bulk_overflow(dbc,
1643                                     bo->tlen, bo->pgno, np)) != 0)
1644                                         return (ret);
1645                                 space -= size;
1646                                 *offp-- = (int32_t)(np - dbuf);
1647                                 np += size;
1648                         } else {
1649                                 if (need_pg) {
1650                                         dp = np;
1651                                         size = pagesize - HOFFSET(pg);
1652                                         if (space < size) {
1653                                                 ret = DB_BUFFER_SMALL;
1654                                                 /* Return space required. */
1655                                                 space = *spacep + size;
1656                                                 break;
1657                                         }
1658                                         memcpy(dp,
1659                                             (u_int8_t *)pg + HOFFSET(pg), size);
1660                                         need_pg = 0;
1661                                         space -= size;
1662                                         np += size;
1663                                 }
1664                                 if (first == 0 && keyoff != NULL) {
1665                                         *offp-- = keyoff[0];
1666                                         *offp-- = keyoff[-1];
1667                                 }
1668                                 size = bk->len;
1669                                 *offp-- = (int32_t)((inp[indx] - HOFFSET(pg))
1670                                     + (dp - dbuf) + SSZA(BKEYDATA, data));
1671                         }
1672                         *offp-- = (int32_t)size;
1673                         first = 0;
1674                         if (no_dup)
1675                                 break;
1676 contin:
1677                         indx++;
1678                         if (opd->dbtype == DB_RECNO)
1679                                 cp->recno++;
1680                 } while (indx < NUM_ENT(pg));
1681                 if (no_dup)
1682                         break;
1683                 cp->indx = indx;
1684
1685         } while (ret == 0);
1686
1687         /* Return the updated information. */
1688         *spacep = space;
1689         *offpp = offp;
1690         *dpp = np;
1691
1692         /*
1693          * If we ran out of space back up the pointer.
1694          * If we did not return any dups or reached the end, close the opd.
1695          */
1696         if (ret == DB_BUFFER_SMALL) {
1697                 if (opd->dbtype == DB_RECNO) {
1698                         if (--cp->recno == 0)
1699                                 goto close_opd;
1700                 } else if (indx != 0)
1701                         cp->indx--;
1702                 else {
1703                         t_ret = __bamc_prev(opd);
1704                         if (t_ret == DB_NOTFOUND)
1705                                 goto close_opd;
1706                         if (t_ret != 0)
1707                                 ret = t_ret;
1708                 }
1709         } else if (keyoff == NULL && ret == DB_NOTFOUND) {
1710                 cp->indx--;
1711                 if (opd->dbtype == DB_RECNO)
1712                         --cp->recno;
1713         } else if (indx == 0 || ret == DB_NOTFOUND) {
1714 close_opd:
1715                 if (ret == DB_NOTFOUND)
1716                         ret = 0;
1717                 if ((t_ret = __dbc_close(opd)) != 0 && ret == 0)
1718                         ret = t_ret;
1719                 ((BTREE_CURSOR *)dbc->internal)->opd = NULL;
1720         }
1721         if (ret == DB_NOTFOUND)
1722                 ret = 0;
1723
1724         return (ret);
1725 }
1726
1727 /*
1728  * __bam_getbothc --
1729  *      Search for a matching data item on a join.
1730  */
1731 static int
1732 __bam_getbothc(dbc, data)
1733         DBC *dbc;
1734         DBT *data;
1735 {
1736         BTREE_CURSOR *cp;
1737         DB *dbp;
1738         DB_MPOOLFILE *mpf;
1739         int cmp, exact, ret;
1740
1741         dbp = dbc->dbp;
1742         mpf = dbp->mpf;
1743         cp = (BTREE_CURSOR *)dbc->internal;
1744
1745         /*
1746          * Acquire the current page.  We have at least a read-lock
1747          * already.  The caller may have set DB_RMW asking for a
1748          * write lock, but upgrading to a write lock has no better
1749          * chance of succeeding now instead of later, so don't try.
1750          */
1751         if ((ret = __memp_fget(mpf, &cp->pgno,
1752              dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
1753                 return (ret);
1754
1755         /*
1756          * An off-page duplicate cursor.  Search the remaining duplicates
1757          * for one which matches (do a normal btree search, then verify
1758          * that the retrieved record is greater than the original one).
1759          */
1760         if (F_ISSET(dbc, DBC_OPD)) {
1761                 /*
1762                  * Check to make sure the desired item comes strictly after
1763                  * the current position;  if it doesn't, return DB_NOTFOUND.
1764                  */
1765                 if ((ret = __bam_cmp(dbc, data, cp->page, cp->indx,
1766                     dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare,
1767                     &cmp)) != 0)
1768                         return (ret);
1769
1770                 if (cmp <= 0)
1771                         return (DB_NOTFOUND);
1772
1773                 /* Discard the current page, we're going to do a full search. */
1774                 if ((ret = __memp_fput(mpf,
1775                      dbc->thread_info, cp->page, dbc->priority)) != 0)
1776                         return (ret);
1777                 cp->page = NULL;
1778
1779                 return (__bamc_search(dbc,
1780                     PGNO_INVALID, data, DB_GET_BOTH, &exact));
1781         }
1782
1783         /*
1784          * We're doing a DBC->get(DB_GET_BOTHC) and we're already searching
1785          * a set of on-page duplicates (either sorted or unsorted).  Continue
1786          * a linear search from after the current position.
1787          *
1788          * (Note that we could have just finished a "set" of one duplicate,
1789          * i.e. not a duplicate at all, but the following check will always
1790          * return DB_NOTFOUND in this case, which is the desired behavior.)
1791          */
1792         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1793             !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1794                 return (DB_NOTFOUND);
1795         cp->indx += P_INDX;
1796
1797         return (__bam_getboth_finddatum(dbc, data, DB_GET_BOTH));
1798 }
1799
1800 #ifdef HAVE_COMPRESSION
1801 /*
1802  * __bam_getlte --
1803  *      Search for the largest entry <= key/data - used by compression.
1804  *
1805  *      data == NULL indicates the DB_SET_LTE flag
1806  *      data != NULL indicates the DB_GET_BOTH_LTE flag
1807  *
1808  *      Only works for a primary cursor - not an OPD cursor. Handles the
1809  *      OPD manipulation as well - no need to return to the caller to
1810  *      perform more OPD movements.
1811  */
1812 static int
1813 __bam_getlte(dbc, key, data)
1814         DBC *dbc;
1815         DBT *key, *data;
1816 {
1817         BTREE_CURSOR *cp, *ocp;
1818         DB *dbp;
1819         db_pgno_t pgno;
1820         int exact, ret;
1821
1822         dbp = dbc->dbp;
1823         cp = (BTREE_CURSOR *)dbc->internal;
1824
1825         /* Begin by searching for the key */
1826         ret = __bamc_search(dbc, PGNO_INVALID, key, DB_SET_RANGE, &exact);
1827         if (ret == DB_NOTFOUND)
1828                 goto find_last;
1829         if (ret != 0)
1830                 goto end;
1831
1832         if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc)) {
1833                 /*
1834                  * Move to the next entry if we're past the end of the
1835                  * page or on a deleted entry.
1836                  */
1837                 ret = __bamc_next(dbc, 0, 0);
1838                 if (ret == DB_NOTFOUND)
1839                         goto find_last;
1840                 if (ret != 0)
1841                         goto end;
1842
1843                 /* Check if we're still on the correct key */
1844                 if ((ret = __bam_cmp(dbc, key, cp->page, cp->indx,
1845                     ((BTREE*)dbp->bt_internal)->bt_compare, &exact)) != 0)
1846                         goto end;
1847                 exact = (exact == 0);
1848         }
1849
1850         if (exact == 0) {
1851                 ret = __bam_get_prev(dbc);
1852                 goto end;
1853         }
1854
1855         if (__bam_isopd(dbc, &pgno)) {
1856                 /*
1857                  * We want to do unusual things with off-page duplicates, so
1858                  * deal with them here rather than returning to handle them.
1859                  */
1860                 if ((ret = __dbc_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
1861                         goto end;
1862
1863                 /* Search for the correct duplicate */
1864                 ret = __bamc_search(cp->opd, PGNO_INVALID, data,
1865                         data == NULL ? DB_FIRST : DB_SET_RANGE, &exact);
1866                 if (ret == DB_NOTFOUND)
1867                         goto find_last_dup;
1868                 if (ret != 0)
1869                         goto end;
1870
1871                 ocp = (BTREE_CURSOR *)cp->opd->internal;
1872                 if (ocp->indx == NUM_ENT(ocp->page) ||
1873                     IS_CUR_DELETED(cp->opd)) {
1874                         /*
1875                          * Move to the next entry if we're past the end of the
1876                          * page or on a deleted entry.
1877                          */
1878                         ret = __bamc_next(cp->opd, 0, 0);
1879                         if (ret == DB_NOTFOUND)
1880                                 goto find_last_dup;
1881                         if (ret != 0)
1882                                 goto end;
1883
1884                         if (data != NULL) {
1885                                 /* Check if we're still on the correct data */
1886                                 if ((ret = __bam_cmp(
1887                                             dbc, data, ocp->page, ocp->indx,
1888                                             dbp->dup_compare, &exact)) != 0)
1889                                         goto end;
1890                                 exact = (exact == 0);
1891                         } else
1892                                 exact = 1;
1893                 }
1894
1895                 if (exact == 0) {
1896                         /* Move to the previous entry */
1897                         ret = __bamc_prev(cp->opd);
1898                         if (ret == DB_NOTFOUND) {
1899                                 if ((ret = __dbc_close(cp->opd)) != 0)
1900                                         goto end;
1901                                 cp->opd = NULL;
1902                                 ret = __bam_get_prev(dbc);
1903                         }
1904                 }
1905         } else if (data != NULL) {
1906                 /*
1907                  * If we got an exact match with on-page duplicates, we need to
1908                  * search in them.
1909                  */
1910                 ret = __bam_getboth_finddatum(dbc, data, DB_GET_BOTH_RANGE);
1911                 if (ret == DB_NOTFOUND)
1912                         exact = 0;
1913                 else if (ret != 0)
1914                         goto end;
1915                 else {
1916                         /* Check if we're still on the correct data */
1917                         if ((ret = __bam_cmp(dbc, data, cp->page,
1918                             cp->indx + O_INDX, dbp->dup_compare, &exact)) != 0)
1919                                 goto end;
1920                         exact = (exact == 0);
1921                 }
1922
1923                 if (exact == 0) {
1924                         ret = __bam_get_prev(dbc);
1925                 }
1926         }
1927
1928  end:
1929         return (ret);
1930
1931  find_last:
1932         if ((ret = __bamc_search(
1933             dbc, PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
1934                 return (ret);
1935
1936         if (__bam_isopd(dbc, &pgno)) {
1937                 if ((ret = __dbc_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
1938                         return (ret);
1939  find_last_dup:
1940                 if ((ret = __bamc_search(
1941                     cp->opd, PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
1942                         return (ret);
1943         }
1944
1945         return (ret);
1946 }
1947 #endif
1948
1949 /*
1950  * __bam_getboth_finddatum --
1951  *      Find a matching on-page data item.
1952  */
1953 static int
1954 __bam_getboth_finddatum(dbc, data, flags)
1955         DBC *dbc;
1956         DBT *data;
1957         u_int32_t flags;
1958 {
1959         BTREE_CURSOR *cp;
1960         DB *dbp;
1961         db_indx_t base, lim, top;
1962         int cmp, ret;
1963
1964         dbp = dbc->dbp;
1965         cp = (BTREE_CURSOR *)dbc->internal;
1966
1967         cmp = 0;
1968
1969         /*
1970          * Called (sometimes indirectly) from DBC->get to search on-page data
1971          * item(s) for a matching value.  If the original flag was DB_GET_BOTH
1972          * or DB_GET_BOTH_RANGE, the cursor is set to the first undeleted data
1973          * item for the key.  If the original flag was DB_GET_BOTHC, the cursor
1974          * argument is set to the first data item we can potentially return.
1975          * In both cases, there may or may not be additional duplicate data
1976          * items to search.
1977          *
1978          * If the duplicates are not sorted, do a linear search.
1979          */
1980         if (dbp->dup_compare == NULL) {
1981                 for (;; cp->indx += P_INDX) {
1982                         if (!IS_CUR_DELETED(dbc)) {
1983                                 if ((ret = __bam_cmp(
1984                                     dbc, data, cp->page, cp->indx + O_INDX,
1985                                     __bam_defcmp, &cmp)) != 0)
1986                                         return (ret);
1987                                 if (cmp == 0)
1988                                         return (0);
1989                         }
1990
1991                         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1992                             !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1993                                 break;
1994                 }
1995                 return (DB_NOTFOUND);
1996         }
1997
1998         /*
1999          * If the duplicates are sorted, do a binary search.  The reason for
2000          * this is that large pages and small key/data pairs result in large
2001          * numbers of on-page duplicates before they get pushed off-page.
2002          *
2003          * Find the top and bottom of the duplicate set.  Binary search
2004          * requires at least two items, don't loop if there's only one.
2005          */
2006         for (base = top = cp->indx; top < NUM_ENT(cp->page); top += P_INDX)
2007                 if (!IS_DUPLICATE(dbc, cp->indx, top))
2008                         break;
2009         if (base == (top - P_INDX)) {
2010                 if  ((ret = __bam_cmp(dbc, data, cp->page,
2011                     cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
2012                         return (ret);
2013                 if (cmp == 0 || (cmp < 0 && flags == DB_GET_BOTH_RANGE))
2014                         return (0);
2015                 cp->indx = top;
2016                 return DB_NOTFOUND;
2017         }
2018
2019         for (lim = (top - base) / (db_indx_t)P_INDX; lim != 0; lim >>= 1) {
2020                 cp->indx = base + ((lim >> 1) * P_INDX);
2021                 if ((ret = __bam_cmp(dbc, data, cp->page,
2022                     cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
2023                         return (ret);
2024                 if (cmp == 0) {
2025                         /*
2026                          * XXX
2027                          * No duplicate duplicates in sorted duplicate sets,
2028                          * so there can be only one.
2029                          */
2030                         if (!IS_CUR_DELETED(dbc))
2031                                 return (0);
2032                         break;
2033                 }
2034                 if (cmp > 0) {
2035                         base = cp->indx + P_INDX;
2036                         --lim;
2037                 }
2038         }
2039
2040         /* No match found; if we're looking for an exact match, we're done. */
2041         if (flags == DB_GET_BOTH)
2042                 return (DB_NOTFOUND);
2043
2044         /*
2045          * Base is the smallest index greater than the data item, may be zero
2046          * or a last + O_INDX index, and may be deleted.  Find an undeleted
2047          * item.
2048          */
2049         cp->indx = base;
2050         while (cp->indx < top && IS_CUR_DELETED(dbc))
2051                 cp->indx += P_INDX;
2052         return (cp->indx < top ? 0 : DB_NOTFOUND);
2053 }
2054
2055 /*
2056  * __bamc_put --
2057  *      Put using a cursor.
2058  */
2059 static int
2060 __bamc_put(dbc, key, data, flags, pgnop)
2061         DBC *dbc;
2062         DBT *key, *data;
2063         u_int32_t flags;
2064         db_pgno_t *pgnop;
2065 {
2066         BTREE *t;
2067         BTREE_CURSOR *cp;
2068         DB *dbp;
2069         DBT dbt;
2070         DB_MPOOLFILE *mpf;
2071         db_pgno_t root_pgno;
2072         int cmp, exact, own, ret, stack;
2073         u_int32_t iiop;
2074         void *arg;
2075
2076         dbp = dbc->dbp;
2077         mpf = dbp->mpf;
2078         cp = (BTREE_CURSOR *)dbc->internal;
2079         root_pgno = cp->root;
2080
2081 split:  ret = stack = 0;
2082         switch (flags) {
2083         case DB_CURRENT:
2084                 if (F_ISSET(cp, C_DELETED))
2085                         return (DB_NOTFOUND);
2086                 /* FALLTHROUGH */
2087         case DB_AFTER:
2088         case DB_BEFORE:
2089                 iiop = flags;
2090                 own = 1;
2091
2092                 /* Acquire the current page with a write lock. */
2093                 ACQUIRE_WRITE_LOCK(dbc, ret);
2094                 if (ret != 0)
2095                         goto err;
2096                 if (cp->page == NULL && (ret = __memp_fget(mpf, &cp->pgno,
2097                     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
2098                         goto err;
2099                 break;
2100         case DB_KEYFIRST:
2101         case DB_KEYLAST:
2102         case DB_NODUPDATA:
2103         case DB_NOOVERWRITE:
2104         case DB_OVERWRITE_DUP:
2105                 own = 0;
2106                 /*
2107                  * Searching off-page, sorted duplicate tree: do a tree search
2108                  * for the correct item; __bamc_search returns the smallest
2109                  * slot greater than the key, use it.
2110                  *
2111                  * See comment below regarding where we can start the search.
2112                  */
2113                 if (F_ISSET(dbc, DBC_OPD)) {
2114                         if ((ret = __bamc_search(dbc,
2115                             F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno,
2116                             data, flags, &exact)) != 0)
2117                                 goto err;
2118                         stack = 1;
2119
2120                         /* Disallow "sorted" duplicate duplicates. */
2121                         if (exact != 0) {
2122                                 if (flags == DB_OVERWRITE_DUP ||
2123                                     IS_DELETED(dbp, cp->page, cp->indx)) {
2124                                         iiop = DB_CURRENT;
2125                                         break;
2126                                 }
2127                                 ret = __db_duperr(dbp, flags);
2128                                 goto err;
2129                         }
2130                         iiop = DB_BEFORE;
2131                         break;
2132                 }
2133
2134                 /*
2135                  * Searching a btree.
2136                  *
2137                  * If we've done a split, we can start the search from the
2138                  * parent of the split page, which __bam_split returned
2139                  * for us in root_pgno, unless we're in a Btree with record
2140                  * numbering.  In that case, we'll need the true root page
2141                  * in order to adjust the record count.
2142                  */
2143                 if ((ret = __bamc_search(dbc,
2144                     F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno, key,
2145                     flags == DB_KEYFIRST || dbp->dup_compare != NULL ?
2146                     DB_KEYFIRST : DB_KEYLAST, &exact)) != 0)
2147                         goto err;
2148                 stack = 1;
2149
2150                 /*
2151                  * If we don't have an exact match, __bamc_search returned
2152                  * the smallest slot greater than the key, use it.
2153                  */
2154                 if (!exact) {
2155                         iiop = DB_KEYFIRST;
2156                         break;
2157
2158                 /*
2159                  * Check for NOOVERWRITE.  It is possible that there
2160                  * is a key with an empty duplicate page attached.
2161                  */
2162                 } else if (flags == DB_NOOVERWRITE && !IS_CUR_DELETED(dbc)) {
2163                         if (pgnop != NULL && __bam_isopd(dbc, pgnop))
2164                                 ret = __bam_opd_exists(dbc, *pgnop);
2165                         else
2166                                 ret = DB_KEYEXIST;
2167                         if (ret != 0)
2168                                 goto err;
2169                 }
2170
2171                 /*
2172                  * If duplicates aren't supported, replace the current item.
2173                  */
2174                 if (!F_ISSET(dbp, DB_AM_DUP)) {
2175                         iiop = DB_CURRENT;
2176                         break;
2177                 }
2178
2179                 /*
2180                  * If we find a matching entry, it may be an off-page duplicate
2181                  * tree.  Return the page number to our caller, we need a new
2182                  * cursor.
2183                  */
2184                 if (pgnop != NULL && __bam_isopd(dbc, pgnop))
2185                         goto done;
2186
2187                 /* If the duplicates aren't sorted, move to the right slot. */
2188                 if (dbp->dup_compare == NULL) {
2189                         if (flags == DB_KEYFIRST)
2190                                 iiop = DB_BEFORE;
2191                         else
2192                                 for (;; cp->indx += P_INDX)
2193                                         if (cp->indx + P_INDX >=
2194                                             NUM_ENT(cp->page) ||
2195                                             !IS_DUPLICATE(dbc, cp->indx,
2196                                             cp->indx + P_INDX)) {
2197                                                 iiop = DB_AFTER;
2198                                                 break;
2199                                         }
2200                         break;
2201                 }
2202
2203                 /*
2204                  * We know that we're looking at the first of a set of sorted
2205                  * on-page duplicates.  Walk the list to find the right slot.
2206                  */
2207                 for (;; cp->indx += P_INDX) {
2208                         if ((ret = __bam_cmp(dbc, data, cp->page,
2209                             cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
2210                                 goto err;
2211                         if (cmp < 0) {
2212                                 iiop = DB_BEFORE;
2213                                 break;
2214                         }
2215
2216                         /* Disallow "sorted" duplicate duplicates. */
2217                         if (cmp == 0) {
2218                                 if (flags == DB_OVERWRITE_DUP ||
2219                                     IS_DELETED(dbp, cp->page, cp->indx)) {
2220                                         iiop = DB_CURRENT;
2221                                         break;
2222                                 }
2223                                 ret = __db_duperr(dbp, flags);
2224                                 goto err;
2225                         }
2226
2227                         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
2228                             P_INP(dbp, ((PAGE *)cp->page))[cp->indx] !=
2229                             P_INP(dbp, ((PAGE *)cp->page))[cp->indx + P_INDX]) {
2230                                 iiop = DB_AFTER;
2231                                 break;
2232                         }
2233                 }
2234                 break;
2235         default:
2236                 ret = __db_unknown_flag(dbp->env, "__bamc_put", flags);
2237                 goto err;
2238         }
2239
2240         switch (ret = __bam_iitem(dbc, key, data, iiop, 0)) {
2241         case 0:
2242                 break;
2243         case DB_NEEDSPLIT:
2244                 /*
2245                  * To split, we need a key for the page.  Either use the key
2246                  * argument or get a copy of the key from the page.
2247                  */
2248                 if (flags == DB_AFTER ||
2249                     flags == DB_BEFORE || flags == DB_CURRENT) {
2250                         memset(&dbt, 0, sizeof(DBT));
2251                         if ((ret = __db_ret(dbc, cp->page, 0, &dbt,
2252                             &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2253                                 goto err;
2254                         arg = &dbt;
2255                 } else
2256                         arg = F_ISSET(dbc, DBC_OPD) ? data : key;
2257
2258                 /*
2259                  * Discard any locks and pinned pages (the locks are discarded
2260                  * even if we're running with transactions, as they lock pages
2261                  * that we're sorry we ever acquired).  If stack is set and the
2262                  * cursor entries are valid, they point to the same entries as
2263                  * the stack, don't free them twice.
2264                  */
2265                 if (stack)
2266                         ret = __bam_stkrel(dbc, STK_CLRDBC | STK_NOLOCK);
2267                 else
2268                         DISCARD_CUR(dbc, ret);
2269                 if (ret != 0)
2270                         goto err;
2271
2272                 /*
2273                  * SR [#6059]
2274                  * If we do not own a lock on the page any more, then clear the
2275                  * cursor so we don't point at it.  Even if we call __bam_stkrel
2276                  * above we still may have entered the routine with the cursor
2277                  * positioned to a particular record.  This is in the case
2278                  * where C_RECNUM is set.
2279                  */
2280                 if (own == 0) {
2281                         cp->pgno = PGNO_INVALID;
2282                         cp->indx = 0;
2283                 }
2284
2285                 /* Split the tree. */
2286                 if ((ret = __bam_split(dbc, arg, &root_pgno)) != 0)
2287                         return (ret);
2288
2289                 goto split;
2290         default:
2291                 goto err;
2292         }
2293
2294 err:
2295 done:   /*
2296          * If we inserted a key into the first or last slot of the tree,
2297          * remember where it was so we can do it more quickly next time.
2298          * If the tree has record numbers, we need a complete stack so
2299          * that we can adjust the record counts, so skipping the tree search
2300          * isn't possible.  For subdatabases we need to be careful that the
2301          * page does not move from one db to another, so we track its LSN.
2302          *
2303          * If there are duplicates and we are inserting into the last slot,
2304          * the cursor will point _to_ the last item, not after it, which
2305          * is why we subtract P_INDX below.
2306          */
2307
2308         t = dbp->bt_internal;
2309         if (ret == 0 && TYPE(cp->page) == P_LBTREE &&
2310             (flags == DB_KEYFIRST || flags == DB_KEYLAST) &&
2311             !F_ISSET(cp, C_RECNUM) &&
2312             (!F_ISSET(dbp, DB_AM_SUBDB) ||
2313             (LOGGING_ON(dbp->env) && !F_ISSET(dbp, DB_AM_NOT_DURABLE))) &&
2314             ((NEXT_PGNO(cp->page) == PGNO_INVALID &&
2315             cp->indx >= NUM_ENT(cp->page) - P_INDX) ||
2316             (PREV_PGNO(cp->page) == PGNO_INVALID && cp->indx == 0))) {
2317                 t->bt_lpgno = cp->pgno;
2318                 if (F_ISSET(dbp, DB_AM_SUBDB))
2319                         t->bt_llsn = LSN(cp->page);
2320         } else
2321                 t->bt_lpgno = PGNO_INVALID;
2322         /*
2323          * Discard any pages pinned in the tree and their locks, except for
2324          * the leaf page.  Note, the leaf page participated in any stack we
2325          * acquired, and so we have to adjust the stack as necessary.  If
2326          * there was only a single page on the stack, we don't have to free
2327          * further stack pages.
2328          */
2329         if (stack && BT_STK_POP(cp) != NULL)
2330                 (void)__bam_stkrel(dbc, 0);
2331
2332         /*
2333          * Regardless of whether we were successful or not, clear the delete
2334          * flag.  If we're successful, we either moved the cursor or the item
2335          * is no longer deleted.  If we're not successful, then we're just a
2336          * copy, no need to have the flag set.
2337          *
2338          * We may have instantiated off-page duplicate cursors during the put,
2339          * so clear the deleted bit from the off-page duplicate cursor as well.
2340          */
2341         F_CLR(cp, C_DELETED);
2342         if (cp->opd != NULL) {
2343                 cp = (BTREE_CURSOR *)cp->opd->internal;
2344                 F_CLR(cp, C_DELETED);
2345         }
2346
2347         return (ret);
2348 }
2349
2350 /*
2351  * __bamc_rget --
2352  *      Return the record number for a cursor.
2353  *
2354  * PUBLIC: int __bamc_rget __P((DBC *, DBT *));
2355  */
2356 int
2357 __bamc_rget(dbc, data)
2358         DBC *dbc;
2359         DBT *data;
2360 {
2361         BTREE_CURSOR *cp;
2362         DB *dbp;
2363         DBT dbt;
2364         DB_MPOOLFILE *mpf;
2365         db_recno_t recno;
2366         int exact, ret, t_ret;
2367
2368         dbp = dbc->dbp;
2369         mpf = dbp->mpf;
2370         cp = (BTREE_CURSOR *)dbc->internal;
2371
2372         /*
2373          * Get the page with the current item on it.
2374          * Get a copy of the key.
2375          * Release the page, making sure we don't release it twice.
2376          */
2377         if ((ret = __memp_fget(mpf, &cp->pgno,
2378              dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
2379                 return (ret);
2380         memset(&dbt, 0, sizeof(DBT));
2381         if ((ret = __db_ret(dbc, cp->page, cp->indx, &dbt,
2382             &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2383                 goto err;
2384         ret = __memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority);
2385         cp->page = NULL;
2386         if (ret != 0)
2387                 return (ret);
2388
2389         if ((ret = __bam_search(dbc, PGNO_INVALID, &dbt,
2390             F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND,
2391             1, &recno, &exact)) != 0)
2392                 goto err;
2393
2394         ret = __db_retcopy(dbc->env, data,
2395             &recno, sizeof(recno), &dbc->rdata->data, &dbc->rdata->ulen);
2396
2397         /* Release the stack. */
2398 err:    if ((t_ret = __bam_stkrel(dbc, 0)) != 0 && ret == 0)
2399                 ret = t_ret;
2400
2401         return (ret);
2402 }
2403
2404 /*
2405  * __bamc_writelock --
2406  *      Upgrade the cursor to a write lock.
2407  */
2408 static int
2409 __bamc_writelock(dbc)
2410         DBC *dbc;
2411 {
2412         BTREE_CURSOR *cp;
2413         int ret;
2414
2415         cp = (BTREE_CURSOR *)dbc->internal;
2416
2417         if (cp->lock_mode == DB_LOCK_WRITE)
2418                 return (0);
2419
2420         /*
2421          * When writing to an off-page duplicate tree, we need to have the
2422          * appropriate page in the primary tree locked.  The general DBC
2423          * code calls us first with the primary cursor so we can acquire the
2424          * appropriate lock.
2425          */
2426         ACQUIRE_WRITE_LOCK(dbc, ret);
2427         return (ret);
2428 }
2429
2430 /*
2431  * __bamc_next --
2432  *      Move to the next record.
2433  */
2434 static int
2435 __bamc_next(dbc, initial_move, deleted_okay)
2436         DBC *dbc;
2437         int initial_move, deleted_okay;
2438 {
2439         BTREE_CURSOR *cp;
2440         db_indx_t adjust;
2441         db_lockmode_t lock_mode;
2442         db_pgno_t pgno;
2443         int ret;
2444
2445         cp = (BTREE_CURSOR *)dbc->internal;
2446         ret = 0;
2447
2448         /*
2449          * We're either moving through a page of duplicates or a btree leaf
2450          * page.
2451          *
2452          * !!!
2453          * This code handles empty pages and pages with only deleted entries.
2454          */
2455         if (F_ISSET(dbc, DBC_OPD)) {
2456                 adjust = O_INDX;
2457                 lock_mode = DB_LOCK_NG;
2458         } else {
2459                 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2460                 lock_mode =
2461                     F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2462         }
2463         if (cp->page == NULL) {
2464                 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, 0, ret);
2465                 if (ret != 0)
2466                         return (ret);
2467         }
2468
2469         if (initial_move)
2470                 cp->indx += adjust;
2471
2472         for (;;) {
2473                 /*
2474                  * If at the end of the page, move to a subsequent page.
2475                  *
2476                  * !!!
2477                  * Check for >= NUM_ENT.  If the original search landed us on
2478                  * NUM_ENT, we may have incremented indx before the test.
2479                  */
2480                 if (cp->indx >= NUM_ENT(cp->page)) {
2481                         if ((pgno = NEXT_PGNO(cp->page)) == PGNO_INVALID)
2482                                 return (DB_NOTFOUND);
2483
2484                         ACQUIRE_CUR(dbc, lock_mode, pgno, 0, ret);
2485                         if (ret != 0)
2486                                 return (ret);
2487                         cp->indx = 0;
2488                         continue;
2489                 }
2490                 if (!deleted_okay && IS_CUR_DELETED(dbc)) {
2491                         cp->indx += adjust;
2492                         continue;
2493                 }
2494                 break;
2495         }
2496         return (0);
2497 }
2498
2499 /*
2500  * __bamc_prev --
2501  *      Move to the previous record.
2502  */
2503 static int
2504 __bamc_prev(dbc)
2505         DBC *dbc;
2506 {
2507         BTREE_CURSOR *cp;
2508         db_indx_t adjust;
2509         db_lockmode_t lock_mode;
2510         db_pgno_t pgno;
2511         int ret;
2512
2513         cp = (BTREE_CURSOR *)dbc->internal;
2514         ret = 0;
2515
2516         /*
2517          * We're either moving through a page of duplicates or a btree leaf
2518          * page.
2519          *
2520          * !!!
2521          * This code handles empty pages and pages with only deleted entries.
2522          */
2523         if (F_ISSET(dbc, DBC_OPD)) {
2524                 adjust = O_INDX;
2525                 lock_mode = DB_LOCK_NG;
2526         } else {
2527                 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2528                 lock_mode =
2529                     F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2530         }
2531         if (cp->page == NULL) {
2532                 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, 0, ret);
2533                 if (ret != 0)
2534                         return (ret);
2535         }
2536
2537         for (;;) {
2538                 /* If at the beginning of the page, move to a previous one. */
2539                 if (cp->indx == 0) {
2540                         if ((pgno =
2541                             PREV_PGNO(cp->page)) == PGNO_INVALID)
2542                                 return (DB_NOTFOUND);
2543
2544                         ACQUIRE_CUR(dbc, lock_mode, pgno, 0, ret);
2545                         if (ret != 0)
2546                                 return (ret);
2547
2548                         if ((cp->indx = NUM_ENT(cp->page)) == 0)
2549                                 continue;
2550                 }
2551
2552                 /* Ignore deleted records. */
2553                 cp->indx -= adjust;
2554                 if (IS_CUR_DELETED(dbc))
2555                         continue;
2556
2557                 break;
2558         }
2559         return (0);
2560 }
2561
2562 /*
2563  * __bamc_search --
2564  *      Move to a specified record.
2565  */
2566 static int
2567 __bamc_search(dbc, root_pgno, key, flags, exactp)
2568         DBC *dbc;
2569         db_pgno_t root_pgno;
2570         const DBT *key;
2571         u_int32_t flags;
2572         int *exactp;
2573 {
2574         BTREE *t;
2575         BTREE_CURSOR *cp;
2576         DB *dbp;
2577         PAGE *h;
2578         db_indx_t base, indx, *inp, lim;
2579         db_pgno_t bt_lpgno;
2580         db_recno_t recno;
2581         u_int32_t sflags;
2582         int bulk, cmp, ret, t_ret;
2583
2584         COMPQUIET(cmp, 0);
2585         dbp = dbc->dbp;
2586         cp = (BTREE_CURSOR *)dbc->internal;
2587         t = dbp->bt_internal;
2588         ret = 0;
2589         bulk = (F_ISSET(dbc, DBC_BULK) && cp->pgno != PGNO_INVALID);
2590
2591         /*
2592          * Find an entry in the database.  Discard any lock we currently hold,
2593          * we're going to search the tree.
2594          */
2595         DISCARD_CUR(dbc, ret);
2596         if (ret != 0)
2597                 return (ret);
2598
2599         switch (flags) {
2600         case DB_FIRST:
2601                 sflags = (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_MIN;
2602                 goto search;
2603         case DB_LAST:
2604                 sflags = (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_MAX;
2605                 goto search;
2606         case DB_SET_RECNO:
2607                 if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0)
2608                         return (ret);
2609                 sflags =
2610                     (F_ISSET(dbc, DBC_RMW) ?  SR_FIND_WR : SR_FIND) | SR_EXACT;
2611                 if ((ret = __bam_rsearch(dbc, &recno, sflags, 1, exactp)) != 0)
2612                         return (ret);
2613                 goto done;
2614         case DB_SET:
2615         case DB_GET_BOTH:
2616                 sflags =
2617                     (F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND) | SR_EXACT;
2618                 if (bulk)
2619                         break;
2620                 goto search;
2621         case DB_GET_BOTH_RANGE:
2622                 sflags = (F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND);
2623                 goto search;
2624         case DB_SET_RANGE:
2625                 sflags =
2626                     (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_DUPFIRST;
2627                 goto search;
2628         case DB_KEYFIRST:
2629         case DB_NOOVERWRITE:
2630                 sflags = SR_KEYFIRST;
2631                 break;
2632         case DB_KEYLAST:
2633         case DB_NODUPDATA:
2634         case DB_OVERWRITE_DUP:
2635                 sflags = SR_KEYLAST;
2636                 break;
2637         default:
2638                 return (__db_unknown_flag(dbp->env, "__bamc_search", flags));
2639         }
2640
2641         /*
2642          * If the application has a history of inserting into the first or last
2643          * pages of the database, we check those pages first to avoid doing a
2644          * full search.  Similarly, if the cursor is configured as a bulk
2645          * cursor, check whether this operation belongs on the same page as the
2646          * last one.
2647          */
2648         if (bulk)
2649                 bt_lpgno = cp->pgno;
2650         else {
2651                 if (F_ISSET(dbc, DBC_OPD))
2652                         goto search;
2653
2654                 /*
2655                  * !!!
2656                  * We do not mutex protect the t->bt_lpgno field, which means
2657                  * that it can only be used in an advisory manner.  If we find
2658                  * page we can use, great.  If we don't, we don't care, we do
2659                  * it the slow way instead.  Regardless, copy it into a local
2660                  * variable, otherwise we might acquire a lock for a page and
2661                  * then read a different page because it changed underfoot.
2662                  */
2663                 bt_lpgno = t->bt_lpgno;
2664         }
2665
2666         /*
2667          * If the tree has no history of insertion, do it the slow way.
2668          */
2669         if (bt_lpgno == PGNO_INVALID)
2670                 goto search;
2671
2672         /*
2673          * Lock and retrieve the page on which we last inserted.
2674          *
2675          * The page may not exist: if a transaction created the page
2676          * and then aborted, the page might have been truncated from
2677          * the end of the file.  We don't want to wait on the lock.
2678          * The page may not even be relevant to this search.
2679          */
2680         h = NULL;
2681         ACQUIRE_CUR(dbc, DB_LOCK_WRITE, bt_lpgno, DB_LOCK_NOWAIT, ret);
2682         if (ret != 0) {
2683                 if (ret == DB_LOCK_DEADLOCK ||
2684                     ret == DB_LOCK_NOTGRANTED ||
2685                     ret == DB_PAGE_NOTFOUND)
2686                         ret = 0;
2687                 goto fast_miss;
2688         }
2689
2690         h = cp->page;
2691         inp = P_INP(dbp, h);
2692
2693         /*
2694          * It's okay if the page type isn't right or it's empty, it
2695          * just means that the world changed.
2696          */
2697         if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0)
2698                 goto fast_miss;
2699
2700         /* Verify that this page cannot have moved to another db. */
2701         if (F_ISSET(dbp, DB_AM_SUBDB) &&
2702             LOG_COMPARE(&t->bt_llsn, &LSN(h)) != 0)
2703                 goto fast_miss;
2704
2705         /*
2706          * What we do here is test to see if we're at the beginning or
2707          * end of the tree and if the new item sorts before/after the
2708          * first/last page entry.  We only try to catch inserts into
2709          * the middle of the tree for bulk cursors.
2710          */
2711         if (h->next_pgno == PGNO_INVALID) {
2712                 indx = NUM_ENT(h) - P_INDX;
2713                 if ((ret = __bam_cmp(dbc, key, h, indx,
2714                     t->bt_compare, &cmp)) != 0)
2715                         goto fast_miss;
2716                 if (cmp > 0) {
2717                         if (FLD_ISSET(sflags, SR_EXACT))
2718                                 return (DB_NOTFOUND);
2719                         else
2720                                 indx += P_INDX;
2721                 }
2722                 if (cmp >= 0)
2723                         goto fast_hit;
2724         }
2725         if (h->prev_pgno == PGNO_INVALID) {
2726                 indx = 0;
2727                 if ((ret = __bam_cmp(dbc, key, h, indx,
2728                     t->bt_compare, &cmp)) != 0)
2729                         goto fast_miss;
2730                 if (cmp < 0 && FLD_ISSET(sflags, SR_EXACT))
2731                         return (DB_NOTFOUND);
2732                 if (cmp <= 0)
2733                         goto fast_hit;
2734         }
2735         if (bulk) {
2736                 DB_BINARY_SEARCH_FOR(base, lim, NUM_ENT(h), P_INDX) {
2737                         DB_BINARY_SEARCH_INCR(indx, base, lim, P_INDX);
2738                         if ((ret = __bam_cmp(dbc, key, h, indx,
2739                             t->bt_compare, &cmp)) != 0)
2740                                 goto fast_miss;
2741
2742                         if (cmp == 0)
2743                                 goto fast_hit;
2744                         if (cmp > 0)
2745                                 DB_BINARY_SEARCH_SHIFT_BASE(indx, base,
2746                                     lim, P_INDX);
2747                 }
2748                 /*
2749                  * No match found: base is the smallest index greater than
2750                  * the key and may be zero or NUM_ENT(h).
2751                  */
2752                 indx = base;
2753                 if (indx > 0 && indx < NUM_ENT(h)) {
2754                         if (FLD_ISSET(sflags, SR_EXACT))
2755                                 return (DB_NOTFOUND);
2756                         goto fast_hit;
2757                 }
2758         }
2759         goto fast_miss;
2760
2761 fast_hit:
2762         if (cmp == 0) {
2763                 /*
2764                  * Found a duplicate.  Deal with DB_KEYFIRST / DB_KEYLAST.
2765                  */
2766                 if (FLD_ISSET(sflags, SR_DUPFIRST))
2767                         while (indx > 0 && inp[indx - P_INDX] == inp[indx])
2768                                 indx -= P_INDX;
2769                 else if (FLD_ISSET(sflags, SR_DUPLAST))
2770                         while (indx < (db_indx_t)(NUM_ENT(h) - P_INDX) &&
2771                             inp[indx] == inp[indx + P_INDX])
2772                                 indx += P_INDX;
2773         }
2774
2775         /* Set the exact match flag, we may have found a duplicate. */
2776         *exactp = (cmp == 0);
2777
2778         /*
2779          * Insert the entry in the stack.  (Our caller is likely to
2780          * call __bam_stkrel() after our return.)
2781          */
2782         BT_STK_CLR(cp);
2783         BT_STK_ENTER(dbp->env,
2784             cp, h, indx, cp->lock, cp->lock_mode, ret);
2785         if (ret != 0)
2786                 return (ret);
2787         goto done;
2788
2789 fast_miss:
2790         /*
2791          * This was not the right page, so we do not need to retain
2792          * the lock even in the presence of transactions.
2793          *
2794          * This is also an error path, so ret may have been set.
2795          */
2796         DISCARD_CUR(dbc, ret);
2797         cp->pgno = PGNO_INVALID;
2798         if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
2799                 ret = t_ret;
2800         if (ret != 0)
2801                 return (ret);
2802
2803 search:
2804         if ((ret = __bam_search(dbc, root_pgno,
2805             key, sflags, 1, NULL, exactp)) != 0)
2806                 return (ret);
2807
2808 done:   /* Initialize the cursor from the stack. */
2809         cp->page = cp->csp->page;
2810         cp->pgno = cp->csp->page->pgno;
2811         cp->indx = cp->csp->indx;
2812         cp->lock = cp->csp->lock;
2813         cp->lock_mode = cp->csp->lock_mode;
2814
2815         /* If on an empty page or a deleted record, move to the next one. */
2816         if (flags == DB_FIRST &&
2817             (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc)))
2818                 if ((ret = __bamc_next(dbc, 0, 0)) != 0)
2819                         return (ret);
2820         if (flags == DB_LAST &&
2821             (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc)))
2822                 if ((ret = __bamc_prev(dbc)) != 0)
2823                         return (ret);
2824
2825         return (0);
2826 }
2827
2828 /*
2829  * __bamc_physdel --
2830  *      Physically remove an item from the page.
2831  */
2832 static int
2833 __bamc_physdel(dbc)
2834         DBC *dbc;
2835 {
2836         BTREE_CURSOR *cp;
2837         DB *dbp;
2838         DBT key;
2839         DB_LOCK next_lock, prev_lock;
2840         db_pgno_t pgno;
2841         int delete_page, empty_page, exact, ret;
2842
2843         dbp = dbc->dbp;
2844         memset(&key, 0, sizeof(DBT));
2845         cp = (BTREE_CURSOR *)dbc->internal;
2846         delete_page = empty_page = ret = 0;
2847         LOCK_INIT(next_lock);
2848         LOCK_INIT(prev_lock);
2849
2850         /* If the page is going to be emptied, consider deleting it. */
2851         delete_page = empty_page =
2852             NUM_ENT(cp->page) == (TYPE(cp->page) == P_LBTREE ? 2 : 1);
2853
2854         /*
2855          * Check if the application turned off reverse splits.  Applications
2856          * can't turn off reverse splits in off-page duplicate trees, that
2857          * space will never be reused unless the exact same key is specified.
2858          */
2859         if (delete_page &&
2860             !F_ISSET(dbc, DBC_OPD) && F_ISSET(dbp, DB_AM_REVSPLITOFF))
2861                 delete_page = 0;
2862
2863         /*
2864          * We never delete the last leaf page.  (Not really true -- we delete
2865          * the last leaf page of off-page duplicate trees, but that's handled
2866          * by our caller, not down here.)
2867          */
2868         if (delete_page && cp->pgno == BAM_ROOT_PGNO(dbc))
2869                 delete_page = 0;
2870
2871         /*
2872          * To delete a leaf page other than an empty root page, we need a
2873          * copy of a key from the page.  Use the 0th page index since it's
2874          * the last key the page held.
2875          *
2876          * !!!
2877          * Note that because __bamc_physdel is always called from a cursor
2878          * close, it should be safe to use the cursor's own "my_rkey" memory
2879          * to temporarily hold this key.  We shouldn't own any returned-data
2880          * memory of interest--if we do, we're in trouble anyway.
2881          */
2882         if (delete_page) {
2883                 if ((ret = __db_ret(dbc, cp->page, 0, &key,
2884                     &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2885                         goto err;
2886         }
2887
2888         /*
2889          * Delete the items.  If page isn't empty, we adjust the cursors.
2890          *
2891          * !!!
2892          * The following operations to delete a page may deadlock.  The easy
2893          * scenario is if we're deleting an item because we're closing cursors
2894          * because we've already deadlocked and want to call txn->abort.  If
2895          * we fail due to deadlock, we'll leave a locked, possibly empty page
2896          * in the tree, which won't be empty long because we'll undo the delete
2897          * when we undo the transaction's modifications.
2898          *
2899          * !!!
2900          * Delete the key item first, otherwise the on-page duplicate checks
2901          * in __bam_ditem() won't work!
2902          */
2903         if ((ret = __memp_dirty(dbp->mpf,
2904             &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
2905                 goto err;
2906         if (TYPE(cp->page) == P_LBTREE) {
2907                 if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2908                         goto err;
2909                 if (!empty_page)
2910                         if ((ret = __bam_ca_di(dbc,
2911                             PGNO(cp->page), cp->indx, -1)) != 0)
2912                                 goto err;
2913         }
2914         if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2915                 goto err;
2916
2917         /* Clear the deleted flag, the item is gone. */
2918         F_CLR(cp, C_DELETED);
2919
2920         if (!empty_page)
2921                 if ((ret = __bam_ca_di(dbc, PGNO(cp->page), cp->indx, -1)) != 0)
2922                         goto err;
2923
2924         /*
2925          * Need to downgrade write locks here or non-txn locks will get stuck.
2926          */
2927         if (F_ISSET(dbc->dbp, DB_AM_READ_UNCOMMITTED)) {
2928                 if ((ret = __TLPUT(dbc, cp->lock)) != 0)
2929                         goto err;
2930                 cp->lock_mode = DB_LOCK_WWRITE;
2931                 if (cp->page != NULL &&
2932                     (ret = __memp_shared(dbp->mpf, cp->page)) != 0)
2933                         goto err;
2934         }
2935         /* If we're not going to try and delete the page, we're done. */
2936         if (!delete_page)
2937                 return (0);
2938
2939         /*
2940          * Lock the previous and next pages before latching the parent
2941          * sub tree.
2942          */
2943         if (STD_LOCKING(dbc)) {
2944                 if ((pgno = PREV_PGNO(cp->page)) != PGNO_INVALID &&
2945                     (ret = __db_lget(dbc,
2946                     0, pgno, DB_LOCK_WRITE, 0, &prev_lock)) != 0)
2947                         return (ret);
2948                 if ((pgno = NEXT_PGNO(cp->page)) != PGNO_INVALID &&
2949                     (ret = __db_lget(dbc,
2950                     0, pgno, DB_LOCK_WRITE, 0, &next_lock)) != 0) {
2951                         (void)__TLPUT(dbc, next_lock);
2952                         return (ret);
2953                 }
2954         }
2955         DISCARD_CUR(dbc, ret);
2956         if (ret != 0)
2957                 goto err;
2958         ret = __bam_search(dbc, PGNO_INVALID, &key, SR_DEL, 0, NULL, &exact);
2959
2960         /*
2961          * If everything worked, delete the stack, otherwise, release the
2962          * stack and page locks without further damage.
2963          */
2964         if (ret == 0)
2965                 ret = __bam_dpages(dbc, 1, BTD_RELINK);
2966         else
2967                 (void)__bam_stkrel(dbc, 0);
2968
2969 err:    if (ret != 0)
2970                 F_SET(dbc, DBC_ERROR);
2971         (void)__TLPUT(dbc, prev_lock);
2972         (void)__TLPUT(dbc, next_lock);
2973         return (ret);
2974 }
2975
2976 /*
2977  * __bamc_getstack --
2978  *      Acquire a full stack for a cursor.
2979  */
2980 static int
2981 __bamc_getstack(dbc)
2982         DBC *dbc;
2983 {
2984         BTREE_CURSOR *cp;
2985         DB *dbp;
2986         DBT dbt;
2987         DB_MPOOLFILE *mpf;
2988         PAGE *h;
2989         int exact, ret, t_ret;
2990
2991         dbp = dbc->dbp;
2992         mpf = dbp->mpf;
2993         cp = (BTREE_CURSOR *)dbc->internal;
2994
2995         /*
2996          * Get the page with the current item on it.  The caller of this
2997          * routine has to already hold a read lock on the page, so there
2998          * is no additional lock to acquire.
2999          */
3000         if ((ret = __memp_fget(mpf, &cp->pgno,
3001              dbc->thread_info, dbc->txn, 0, &h)) != 0)
3002                 return (ret);
3003
3004         /* Get a copy of a key from the page. */
3005         memset(&dbt, 0, sizeof(DBT));
3006         ret = __db_ret(dbc, h, 0, &dbt,
3007              &dbc->my_rkey.data, &dbc->my_rkey.ulen);
3008         if ((t_ret = __memp_fput(mpf,
3009              dbc->thread_info, h, dbc->priority)) != 0 && ret == 0)
3010                 ret = t_ret;
3011         if (ret != 0)
3012                 return (ret);
3013
3014         /* Get a write-locked stack for the page. */
3015         exact = 0;
3016         ret = __bam_search(dbc, PGNO_INVALID,
3017             &dbt, SR_KEYFIRST, 1, NULL, &exact);
3018
3019         return (ret);
3020 }
3021
3022 /*
3023  * __bam_isopd --
3024  *      Return if the cursor references an off-page duplicate tree via its
3025  *      page number.
3026  */
3027 static int
3028 __bam_isopd(dbc, pgnop)
3029         DBC *dbc;
3030         db_pgno_t *pgnop;
3031 {
3032         BOVERFLOW *bo;
3033
3034         if (TYPE(dbc->internal->page) != P_LBTREE)
3035                 return (0);
3036
3037         bo = GET_BOVERFLOW(dbc->dbp,
3038             dbc->internal->page, dbc->internal->indx + O_INDX);
3039         if (B_TYPE(bo->type) == B_DUPLICATE) {
3040                 *pgnop = bo->pgno;
3041                 return (1);
3042         }
3043         return (0);
3044 }
3045
3046 /*
3047  * __bam_opd_exists --
3048  *      Return if the current position has any data.
3049  * PUBLIC: int  __bam_opd_exists __P((DBC *, db_pgno_t));
3050  */
3051 int
3052 __bam_opd_exists(dbc, pgno)
3053         DBC *dbc;
3054         db_pgno_t pgno;
3055 {
3056         PAGE *h;
3057         int ret;
3058
3059         if ((ret = __memp_fget(dbc->dbp->mpf, &pgno,
3060             dbc->thread_info, dbc->txn, 0, &h)) != 0)
3061                 return (ret);
3062
3063         /*
3064          * We always collapse OPD trees so we only need to check
3065          * the number of entries on the root.  If there is a non-empty
3066          * tree then there will be duplicates.
3067          */
3068         if (NUM_ENT(h) == 0)
3069                 ret = 0;
3070         else
3071                 ret = DB_KEYEXIST;
3072
3073         (void)__memp_fput(dbc->dbp->mpf, dbc->thread_info, h, dbc->priority);
3074
3075         return (ret);
3076 }