Remove definition of builtin function
[platform/upstream/db4.git] / btree / bt_cursor.c
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996-2009 Oracle.  All rights reserved.
5  *
6  * $Id$
7  */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/lock.h"
15 #include "dbinc/mp.h"
16
17 static int  __bam_bulk __P((DBC *, DBT *, u_int32_t));
18 static int  __bamc_close __P((DBC *, db_pgno_t, int *));
19 static int  __bamc_del __P((DBC *, u_int32_t));
20 static int  __bamc_destroy __P((DBC *));
21 static int  __bamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
22 static int  __bamc_getstack __P((DBC *));
23 static int  __bamc_next __P((DBC *, int, int));
24 static int  __bamc_physdel __P((DBC *));
25 static int  __bamc_prev __P((DBC *));
26 static int  __bamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
27 static int  __bamc_search __P((DBC *,
28                 db_pgno_t, const DBT *, u_int32_t, int *));
29 static int  __bamc_writelock __P((DBC *));
30 static int  __bam_getboth_finddatum __P((DBC *, DBT *, u_int32_t));
31 static int  __bam_getbothc __P((DBC *, DBT *));
32 static int  __bam_get_prev __P((DBC *));
33 static int  __bam_isopd __P((DBC *, db_pgno_t *));
34 #ifdef HAVE_COMPRESSION
35 static int  __bam_getlte __P((DBC *, DBT *, DBT *));
36 #endif
37
38 /*
39  * Acquire a new page/lock.  If we hold a page/lock, discard the page, and
40  * lock-couple the lock.
41  *
42  * !!!
43  * We have to handle both where we have a lock to lock-couple and where we
44  * don't -- we don't duplicate locks when we duplicate cursors if we are
45  * running in a transaction environment as there's no point if locks are
46  * never discarded.  This means that the cursor may or may not hold a lock.
47  * In the case where we are descending the tree we always want to unlock
48  * the held interior page so we use ACQUIRE_COUPLE.
49  */
50 #undef  ACQUIRE
51 #define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, flags, ret) do {  \
52         DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
53         if ((pagep) != NULL) {                                          \
54                 ret = __memp_fput(__mpf,                                \
55                      (dbc)->thread_info, pagep, dbc->priority);         \
56                 pagep = NULL;                                           \
57         } else                                                          \
58                 ret = 0;                                                \
59         if ((ret) == 0 && STD_LOCKING(dbc))                             \
60                 ret = __db_lget(                                        \
61                     dbc, LCK_COUPLE, lpgno, mode, flags, &(lock));      \
62         if ((ret) == 0)                                                 \
63                 ret = __memp_fget(__mpf, &(fpgno),                      \
64                     (dbc)->thread_info, (dbc)->txn, 0, &(pagep));       \
65 } while (0)
66
67 /* Acquire a new page/lock for a cursor. */
68 #undef  ACQUIRE_CUR
69 #define ACQUIRE_CUR(dbc, mode, p, flags, ret) do {                      \
70         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
71         if (p != __cp->pgno)                                            \
72                 __cp->pgno = PGNO_INVALID;                              \
73         ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, flags, ret);   \
74         if ((ret) == 0) {                                               \
75                 __cp->pgno = p;                                         \
76                 __cp->lock_mode = (mode);                               \
77         }                                                               \
78 } while (0)
79
80 /*
81  * Acquire a write lock if we don't already have one.
82  *
83  * !!!
84  * See ACQUIRE macro on why we handle cursors that don't have locks.
85  */
86 #undef  ACQUIRE_WRITE_LOCK
87 #define ACQUIRE_WRITE_LOCK(dbc, ret) do {                               \
88         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
89         DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
90         int __get_page = 0;                                             \
91         ret = 0;                                                        \
92         if (STD_LOCKING(dbc) && __cp->lock_mode != DB_LOCK_WRITE) {     \
93                 if (__cp->page != NULL) {                               \
94                         (ret) = __memp_fput(__mpf, (dbc)->thread_info,  \
95                             __cp->page, (dbc)->priority);               \
96                         __cp->page = NULL;                              \
97                         __get_page = 1;                                 \
98                         if ((ret) !=0)                                  \
99                                 break;                                  \
100                 }                                                       \
101                 if (((ret) = __db_lget((dbc),                           \
102                     LOCK_ISSET(__cp->lock) ? LCK_COUPLE : 0,            \
103                     __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) != 0)   \
104                         break;                                          \
105                 __cp->lock_mode = DB_LOCK_WRITE;                        \
106                 if (__get_page == 0)                                    \
107                         break;                                          \
108                 (ret) = __memp_fget(__mpf, &__cp->pgno,         \
109                     (dbc)->thread_info,                                 \
110                     (dbc)->txn, DB_MPOOL_DIRTY, &__cp->page);           \
111         }                                                               \
112 } while (0)
113
114 /* Discard the current page/lock for a cursor. */
115 #undef  DISCARD_CUR
116 #define DISCARD_CUR(dbc, ret) do {                                      \
117         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
118         DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
119         int __t_ret;                                                    \
120         if ((__cp->page) != NULL) {                                     \
121                 __t_ret = __memp_fput(__mpf,                            \
122                      (dbc)->thread_info, __cp->page, dbc->priority);\
123                 __cp->page = NULL;                                      \
124         } else                                                          \
125                 __t_ret = 0;                                            \
126         if (__t_ret != 0 && (ret) == 0)                                 \
127                 ret = __t_ret;                                          \
128         __t_ret = __TLPUT((dbc), __cp->lock);                           \
129         if (__t_ret != 0 && (ret) == 0)                                 \
130                 ret = __t_ret;                                          \
131         if ((ret) == 0 && !LOCK_ISSET(__cp->lock))                      \
132                 __cp->lock_mode = DB_LOCK_NG;                           \
133         __cp->stream_start_pgno = PGNO_INVALID;                         \
134 } while (0)
135
136 /* If on-page item is a deleted record. */
137 #undef  IS_DELETED
138 #define IS_DELETED(dbp, page, indx)                                     \
139         B_DISSET(GET_BKEYDATA(dbp, page,                                \
140             (indx) + (TYPE(page) == P_LBTREE ? O_INDX : 0))->type)
141 #undef  IS_CUR_DELETED
142 #define IS_CUR_DELETED(dbc)                                             \
143         IS_DELETED((dbc)->dbp, (dbc)->internal->page, (dbc)->internal->indx)
144
145 /*
146  * Test to see if two cursors could point to duplicates of the same key.
147  * In the case of off-page duplicates they are they same, as the cursors
148  * will be in the same off-page duplicate tree.  In the case of on-page
149  * duplicates, the key index offsets must be the same.  For the last test,
150  * as the original cursor may not have a valid page pointer, we use the
151  * current cursor's.
152  */
153 #undef  IS_DUPLICATE
154 #define IS_DUPLICATE(dbc, i1, i2)                                       \
155             (P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i1] ==   \
156              P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i2])
157 #undef  IS_CUR_DUPLICATE
158 #define IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)                     \
159         (F_ISSET(dbc, DBC_OPD) ||                                       \
160             (orig_pgno == (dbc)->internal->pgno &&                      \
161             IS_DUPLICATE(dbc, (dbc)->internal->indx, orig_indx)))
162
163 /*
164  * __bamc_init --
165  *      Initialize the access private portion of a cursor
166  *
167  * PUBLIC: int __bamc_init __P((DBC *, DBTYPE));
168  */
169 int
170 __bamc_init(dbc, dbtype)
171         DBC *dbc;
172         DBTYPE dbtype;
173 {
174         ENV *env;
175         int ret;
176 #ifdef HAVE_COMPRESSION
177         BTREE_CURSOR *cp;
178 #endif
179
180         env = dbc->env;
181
182         /* Allocate/initialize the internal structure. */
183         if (dbc->internal == NULL) {
184                 if ((ret = __os_calloc(
185                     env, 1, sizeof(BTREE_CURSOR), &dbc->internal)) != 0)
186                         return (ret);
187
188 #ifdef HAVE_COMPRESSION
189                 cp = (BTREE_CURSOR*)dbc->internal;
190                 cp->compressed.flags = DB_DBT_USERMEM;
191                 cp->key1.flags = DB_DBT_USERMEM;
192                 cp->key2.flags = DB_DBT_USERMEM;
193                 cp->data1.flags = DB_DBT_USERMEM;
194                 cp->data2.flags = DB_DBT_USERMEM;
195                 cp->del_key.flags = DB_DBT_USERMEM;
196                 cp->del_data.flags = DB_DBT_USERMEM;
197 #endif
198         }
199
200         /* Initialize methods. */
201         dbc->close = dbc->c_close = __dbc_close_pp;
202         dbc->cmp = __dbc_cmp_pp;
203         dbc->count = dbc->c_count = __dbc_count_pp;
204         dbc->del = dbc->c_del = __dbc_del_pp;
205         dbc->dup = dbc->c_dup = __dbc_dup_pp;
206         dbc->get = dbc->c_get = __dbc_get_pp;
207         dbc->pget = dbc->c_pget = __dbc_pget_pp;
208         dbc->put = dbc->c_put = __dbc_put_pp;
209         if (dbtype == DB_BTREE) {
210                 dbc->am_bulk = __bam_bulk;
211                 dbc->am_close = __bamc_close;
212                 dbc->am_del = __bamc_del;
213                 dbc->am_destroy = __bamc_destroy;
214                 dbc->am_get = __bamc_get;
215                 dbc->am_put = __bamc_put;
216                 dbc->am_writelock = __bamc_writelock;
217         } else {
218                 dbc->am_bulk = __bam_bulk;
219                 dbc->am_close = __bamc_close;
220                 dbc->am_del = __ramc_del;
221                 dbc->am_destroy = __bamc_destroy;
222                 dbc->am_get = __ramc_get;
223                 dbc->am_put = __ramc_put;
224                 dbc->am_writelock = __bamc_writelock;
225         }
226
227         return (0);
228 }
229
230 /*
231  * __bamc_refresh
232  *      Set things up properly for cursor re-use.
233  *
234  * PUBLIC: int __bamc_refresh __P((DBC *));
235  */
236 int
237 __bamc_refresh(dbc)
238         DBC *dbc;
239 {
240         BTREE *t;
241         BTREE_CURSOR *cp;
242         DB *dbp;
243
244         dbp = dbc->dbp;
245         t = dbp->bt_internal;
246         cp = (BTREE_CURSOR *)dbc->internal;
247
248         /*
249          * If our caller set the root page number, it's because the root was
250          * known.  This is always the case for off page dup cursors.  Else,
251          * pull it out of our internal information.
252          */
253         if (cp->root == PGNO_INVALID)
254                 cp->root = t->bt_root;
255
256         LOCK_INIT(cp->lock);
257         cp->lock_mode = DB_LOCK_NG;
258
259         if (cp->sp == NULL) {
260                 cp->sp = cp->stack;
261                 cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]);
262         }
263         BT_STK_CLR(cp);
264
265 #ifdef HAVE_COMPRESSION
266         /* Initialize compression */
267         cp->prevKey = 0;
268         cp->prevData = 0;
269         cp->currentKey = 0;
270         cp->currentData = 0;
271         cp->compcursor = 0;
272         cp->compend = 0;
273         cp->prevcursor = 0;
274         cp->prev2cursor = 0;
275 #endif
276
277         /*
278          * The btree leaf page data structures require that two key/data pairs
279          * (or four items) fit on a page, but other than that there's no fixed
280          * requirement.  The btree off-page duplicates only require two items,
281          * to be exact, but requiring four for them as well seems reasonable.
282          *
283          * Recno uses the btree bt_ovflsize value -- it's close enough.
284          */
285         cp->ovflsize = B_MINKEY_TO_OVFLSIZE(
286             dbp,  F_ISSET(dbc, DBC_OPD) ? 2 : t->bt_minkey, dbp->pgsize);
287
288         cp->recno = RECNO_OOB;
289         cp->order = INVALID_ORDER;
290         cp->flags = 0;
291
292         /* Initialize for record numbers. */
293         if (F_ISSET(dbc, DBC_OPD) ||
294             dbc->dbtype == DB_RECNO || F_ISSET(dbp, DB_AM_RECNUM)) {
295                 F_SET(cp, C_RECNUM);
296
297                 /*
298                  * All btrees that support record numbers, optionally standard
299                  * recno trees, and all off-page duplicate recno trees have
300                  * mutable record numbers.
301                  */
302                 if ((F_ISSET(dbc, DBC_OPD) && dbc->dbtype == DB_RECNO) ||
303                     F_ISSET(dbp, DB_AM_RECNUM | DB_AM_RENUMBER))
304                         F_SET(cp, C_RENUMBER);
305         }
306
307         return (0);
308 }
309
310 /*
311  * __bamc_close --
312  *      Close down the cursor.
313  */
314 static int
315 __bamc_close(dbc, root_pgno, rmroot)
316         DBC *dbc;
317         db_pgno_t root_pgno;
318         int *rmroot;
319 {
320         BTREE_CURSOR *cp, *cp_opd, *cp_c;
321         DB *dbp;
322         DBC *dbc_opd, *dbc_c;
323         DB_MPOOLFILE *mpf;
324         ENV *env;
325         PAGE *h;
326         int cdb_lock, count, ret;
327
328         dbp = dbc->dbp;
329         env = dbp->env;
330         mpf = dbp->mpf;
331         cp = (BTREE_CURSOR *)dbc->internal;
332         cp_opd = (dbc_opd = cp->opd) == NULL ?
333             NULL : (BTREE_CURSOR *)dbc_opd->internal;
334         cdb_lock = ret = 0;
335
336         /*
337          * There are 3 ways this function is called:
338          *
339          * 1. Closing a primary cursor: we get called with a pointer to a
340          *    primary cursor that has a NULL opd field.  This happens when
341          *    closing a btree/recno database cursor without an associated
342          *    off-page duplicate tree.
343          *
344          * 2. Closing a primary and an off-page duplicate cursor stack: we
345          *    get called with a pointer to the primary cursor which has a
346          *    non-NULL opd field.  This happens when closing a btree cursor
347          *    into database with an associated off-page btree/recno duplicate
348          *    tree. (It can't be a primary recno database, recno databases
349          *    don't support duplicates.)
350          *
351          * 3. Closing an off-page duplicate cursor stack: we get called with
352          *    a pointer to the off-page duplicate cursor.  This happens when
353          *    closing a non-btree database that has an associated off-page
354          *    btree/recno duplicate tree or for a btree database when the
355          *    opd tree is not empty (root_pgno == PGNO_INVALID).
356          *
357          * If either the primary or off-page duplicate cursor deleted a btree
358          * key/data pair, check to see if the item is still referenced by a
359          * different cursor.  If it is, confirm that cursor's delete flag is
360          * set and leave it to that cursor to do the delete.
361          *
362          * NB: The test for == 0 below is correct.  Our caller already removed
363          * our cursor argument from the active queue, we won't find it when we
364          * search the queue in __bam_ca_delete().
365          * NB: It can't be true that both the primary and off-page duplicate
366          * cursors have deleted a btree key/data pair.  Either the primary
367          * cursor may have deleted an item and there's no off-page duplicate
368          * cursor, or there's an off-page duplicate cursor and it may have
369          * deleted an item.
370          *
371          * Primary recno databases aren't an issue here.  Recno keys are either
372          * deleted immediately or never deleted, and do not have to be handled
373          * here.
374          *
375          * Off-page duplicate recno databases are an issue here, cases #2 and
376          * #3 above can both be off-page recno databases.  The problem is the
377          * same as the final problem for off-page duplicate btree databases.
378          * If we no longer need the off-page duplicate tree, we want to remove
379          * it.  For off-page duplicate btrees, we are done with the tree when
380          * we delete the last item it contains, i.e., there can be no further
381          * references to it when it's empty.  For off-page duplicate recnos,
382          * we remove items from the tree as the application calls the remove
383          * function, so we are done with the tree when we close the last cursor
384          * that references it.
385          *
386          * We optionally take the root page number from our caller.  If the
387          * primary database is a btree, we can get it ourselves because dbc
388          * is the primary cursor.  If the primary database is not a btree,
389          * the problem is that we may be dealing with a stack of pages.  The
390          * cursor we're using to do the delete points at the bottom of that
391          * stack and we need the top of the stack.
392          */
393         if (F_ISSET(cp, C_DELETED)) {
394                 dbc_c = dbc;
395                 switch (dbc->dbtype) {
396                 case DB_BTREE:                          /* Case #1, #3. */
397                         if ((ret = __bam_ca_delete(
398                             dbp, cp->pgno, cp->indx, 1, &count)) != 0)
399                                 goto err;
400                         if (count == 0)
401                                 goto lock;
402                         goto done;
403                 case DB_RECNO:
404                         if (!F_ISSET(dbc, DBC_OPD))     /* Case #1. */
405                                 goto done;
406                                                         /* Case #3. */
407                         if ((ret = __ram_ca_delete(dbp, cp->root, &count)) != 0)
408                                 goto err;
409                         if (count == 0)
410                                 goto lock;
411                         goto done;
412                 case DB_HASH:
413                 case DB_QUEUE:
414                 case DB_UNKNOWN:
415                 default:
416                         ret = __db_unknown_type(
417                             env, "DbCursor.close", dbc->dbtype);
418                         goto err;
419                 }
420         }
421
422         if (dbc_opd == NULL)
423                 goto done;
424
425         if (F_ISSET(cp_opd, C_DELETED)) {               /* Case #2. */
426                 /*
427                  * We will not have been provided a root page number.  Acquire
428                  * one from the primary database.
429                  */
430                 if ((h = cp->page) == NULL && (ret = __memp_fget(mpf, &cp->pgno,
431                      dbc->thread_info, dbc->txn, 0, &h)) != 0)
432                         goto err;
433                 root_pgno = GET_BOVERFLOW(dbp, h, cp->indx + O_INDX)->pgno;
434                 if ((ret = __memp_fput(mpf,
435                      dbc->thread_info, h, dbc->priority)) != 0)
436                         goto err;
437                 cp->page = NULL;
438
439                 dbc_c = dbc_opd;
440                 switch (dbc_opd->dbtype) {
441                 case DB_BTREE:
442                         if ((ret = __bam_ca_delete(
443                             dbp, cp_opd->pgno, cp_opd->indx, 1, &count)) != 0)
444                                 goto err;
445                         if (count == 0)
446                                 goto lock;
447                         goto done;
448                 case DB_RECNO:
449                         if ((ret =
450                             __ram_ca_delete(dbp, cp_opd->root, &count)) != 0)
451                                 goto err;
452                         if (count == 0)
453                                 goto lock;
454                         goto done;
455                 case DB_HASH:
456                 case DB_QUEUE:
457                 case DB_UNKNOWN:
458                 default:
459                         ret = __db_unknown_type(
460                            env, "DbCursor.close", dbc->dbtype);
461                         goto err;
462                 }
463         }
464         goto done;
465
466 lock:   cp_c = (BTREE_CURSOR *)dbc_c->internal;
467
468         /*
469          * If this is CDB, upgrade the lock if necessary.  While we acquired
470          * the write lock to logically delete the record, we released it when
471          * we returned from that call, and so may not be holding a write lock
472          * at the moment.
473          */
474         if (CDB_LOCKING(env)) {
475                 if (F_ISSET(dbc, DBC_WRITECURSOR)) {
476                         if ((ret = __lock_get(env,
477                             dbc->locker, DB_LOCK_UPGRADE, &dbc->lock_dbt,
478                             DB_LOCK_WRITE, &dbc->mylock)) != 0)
479                                 goto err;
480                         cdb_lock = 1;
481                 }
482                 goto delete;
483         }
484
485         /*
486          * The variable dbc_c has been initialized to reference the cursor in
487          * which we're going to do the delete.  Initialize the cursor's lock
488          * structures as necessary.
489          *
490          * First, we may not need to acquire any locks.  If we're in case #3,
491          * that is, the primary database isn't a btree database, our caller
492          * is responsible for acquiring any necessary locks before calling us.
493          */
494         if (F_ISSET(dbc, DBC_OPD))
495                 goto delete;
496
497         /*
498          * Otherwise, acquire a write lock on the primary database's page.
499          *
500          * Lock the primary database page, regardless of whether we're deleting
501          * an item on a primary database page or an off-page duplicates page.
502          *
503          * If the cursor that did the initial logical deletion (and had a write
504          * lock) is not the same cursor doing the physical deletion (which may
505          * have only ever had a read lock on the item), we need to upgrade to a
506          * write lock.  The confusion comes as follows:
507          *
508          *      C1      created, acquires item read lock
509          *      C2      dup C1, create C2, also has item read lock.
510          *      C1      acquire write lock, delete item
511          *      C1      close
512          *      C2      close, needs a write lock to physically delete item.
513          *
514          * If we're in a TXN, we know that C2 will be able to acquire the write
515          * lock, because no locker other than the one shared by C1 and C2 can
516          * acquire a write lock -- the original write lock C1 acquired was never
517          * discarded.
518          *
519          * If we're not in a TXN, it's nastier.  Other cursors might acquire
520          * read locks on the item after C1 closed, discarding its write lock,
521          * and such locks would prevent C2 from acquiring a read lock.  That's
522          * OK, though, we'll simply wait until we can acquire a write lock, or
523          * we'll deadlock.  (Which better not happen, since we're not in a TXN.)
524          *
525          * There are similar scenarios with dirty reads, where the cursor may
526          * have downgraded its write lock to a was-write lock.
527          */
528         if (STD_LOCKING(dbc))
529                 if ((ret = __db_lget(dbc,
530                     LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
531                         goto err;
532
533 delete: /*
534          * If the delete occurred in a Btree, we're going to look at the page
535          * to see if the item has to be physically deleted.  Otherwise, we do
536          * not need the actual page (and it may not even exist, it might have
537          * been truncated from the file after an allocation aborted).
538          *
539          * Delete the on-page physical item referenced by the cursor.
540          */
541         if (dbc_c->dbtype == DB_BTREE) {
542                 if ((ret = __memp_fget(mpf, &cp_c->pgno, dbc->thread_info,
543                     dbc->txn, DB_MPOOL_DIRTY, &cp_c->page)) != 0)
544                         goto err;
545                 if ((ret = __bamc_physdel(dbc_c)) != 0)
546                         goto err;
547         }
548
549         /*
550          * If we're not working in an off-page duplicate tree, then we're
551          * done.
552          */
553         if (!F_ISSET(dbc_c, DBC_OPD) || root_pgno == PGNO_INVALID)
554                 goto done;
555
556         /*
557          * We may have just deleted the last element in the off-page duplicate
558          * tree, and closed the last cursor in the tree.  For an off-page btree
559          * there are no other cursors in the tree by definition, if the tree is
560          * empty.  For an off-page recno we know we have closed the last cursor
561          * in the tree because the __ram_ca_delete call above returned 0 only
562          * in that case.  So, if the off-page duplicate tree is empty at this
563          * point, we want to remove it.
564          */
565         if (((h = dbc_c->internal->page) == NULL || h->pgno != root_pgno) &&
566             (ret = __memp_fget(mpf,
567             &root_pgno, dbc->thread_info, dbc->txn, 0, &h)) != 0)
568                 goto err;
569         if (NUM_ENT(h) == 0) {
570                 if (h != dbc_c->internal->page)
571                         DISCARD_CUR(dbc_c, ret);
572                 else
573                         dbc_c->internal->page = NULL;
574                 if (ret != 0)
575                         goto err;
576                 if ((ret = __db_free(dbc, h)) != 0)
577                         goto err;
578         } else {
579                 if (h != dbc_c->internal->page && (ret = __memp_fput(mpf,
580                      dbc->thread_info, h, dbc->priority)) != 0)
581                         goto err;
582                 goto done;
583         }
584
585         /*
586          * When removing the tree, we have to do one of two things.  If this is
587          * case #2, that is, the primary tree is a btree, delete the key that's
588          * associated with the tree from the btree leaf page.  We know we are
589          * the only reference to it and we already have the correct lock.  We
590          * detect this case because the cursor that was passed to us references
591          * an off-page duplicate cursor.
592          *
593          * If this is case #3, that is, the primary tree isn't a btree, pass
594          * the information back to our caller, it's their job to do cleanup on
595          * the primary page.
596          */
597         if (dbc_opd != NULL) {
598                 if ((ret = __memp_fget(mpf, &cp->pgno, dbc->thread_info,
599                     dbc->txn, DB_MPOOL_DIRTY, &cp->page)) != 0)
600                         goto err;
601                 if ((ret = __bamc_physdel(dbc)) != 0)
602                         goto err;
603         } else
604                 *rmroot = 1;
605 err:
606 done:   /*
607          * Discard the page references and locks, and confirm that the stack
608          * has been emptied.
609          */
610         if (dbc_opd != NULL)
611                 DISCARD_CUR(dbc_opd, ret);
612         DISCARD_CUR(dbc, ret);
613
614         /* Downgrade any CDB lock we acquired. */
615         if (cdb_lock)
616                 (void)__lock_downgrade(env, &dbc->mylock, DB_LOCK_IWRITE, 0);
617
618         return (ret);
619 }
620
621 /*
622  * __bamc_cmp --
623  *      Compare two btree cursors for equality.
624  *
625  * This function is only called with two cursors that point to the same item.
626  * It only distinguishes cursors pointing to deleted and undeleted items at
627  * the same location.
628  *
629  * PUBLIC: int __bamc_cmp __P((DBC *, DBC *, int *));
630  */
631 int
632 __bamc_cmp(dbc, other_dbc, result)
633         DBC *dbc, *other_dbc;
634         int *result;
635 {
636         ENV *env;
637         BTREE_CURSOR *bcp, *obcp;
638
639         env = dbc->env;
640         bcp = (BTREE_CURSOR *)dbc->internal;
641         obcp = (BTREE_CURSOR *)other_dbc->internal;
642
643         DB_ASSERT (env, bcp->pgno == obcp->pgno);
644         DB_ASSERT (env, bcp->indx == obcp->indx);
645
646         /* Check to see if both cursors have the same deleted flag. */
647         *result =
648             ((F_ISSET(bcp, C_DELETED)) == F_ISSET(obcp, C_DELETED)) ? 0 : 1;
649         return (0);
650 }
651
652 /*
653  * __bamc_destroy --
654  *      Close a single cursor -- internal version.
655  */
656 static int
657 __bamc_destroy(dbc)
658         DBC *dbc;
659 {
660         BTREE_CURSOR *cp;
661         ENV *env;
662
663         cp = (BTREE_CURSOR *)dbc->internal;
664         env = dbc->env;
665
666         /* Discard the structures. */
667         if (cp->sp != cp->stack)
668                 __os_free(env, cp->sp);
669
670 #ifdef HAVE_COMPRESSION
671         /* Free the memory used for compression */
672         __os_free(env, cp->compressed.data);
673         __os_free(env, cp->key1.data);
674         __os_free(env, cp->key2.data);
675         __os_free(env, cp->data1.data);
676         __os_free(env, cp->data2.data);
677         __os_free(env, cp->del_key.data);
678         __os_free(env, cp->del_data.data);
679 #endif
680
681         __os_free(env, cp);
682
683         return (0);
684 }
685
686 /*
687  * __bamc_count --
688  *      Return a count of on and off-page duplicates.
689  *
690  * PUBLIC: int __bamc_count __P((DBC *, db_recno_t *));
691  */
692 int
693 __bamc_count(dbc, recnop)
694         DBC *dbc;
695         db_recno_t *recnop;
696 {
697         BTREE_CURSOR *cp;
698         DB *dbp;
699         DB_MPOOLFILE *mpf;
700         db_indx_t indx, top;
701         db_recno_t recno;
702         int ret;
703
704         dbp = dbc->dbp;
705         mpf = dbp->mpf;
706         cp = (BTREE_CURSOR *)dbc->internal;
707
708         /*
709          * Called with the top-level cursor that may reference an off-page
710          * duplicates tree.  We don't have to acquire any new locks, we have
711          * to have a read lock to even get here.
712          */
713         if (cp->opd == NULL) {
714                 /*
715                  * On-page duplicates, get the page and count.
716                  */
717                 DB_ASSERT(dbp->env, cp->page == NULL);
718                 if ((ret = __memp_fget(mpf, &cp->pgno,
719                     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
720                         return (ret);
721
722                 /*
723                  * Move back to the beginning of the set of duplicates and
724                  * then count forward.
725                  */
726                 for (indx = cp->indx;; indx -= P_INDX)
727                         if (indx == 0 ||
728                             !IS_DUPLICATE(dbc, indx, indx - P_INDX))
729                                 break;
730                 for (recno = 0,
731                     top = NUM_ENT(cp->page) - P_INDX;; indx += P_INDX) {
732                         if (!IS_DELETED(dbp, cp->page, indx))
733                                 ++recno;
734                         if (indx == top ||
735                             !IS_DUPLICATE(dbc, indx, indx + P_INDX))
736                                 break;
737                 }
738         } else {
739                 /*
740                  * Off-page duplicates tree, get the root page of the off-page
741                  * duplicate tree.
742                  */
743                 if ((ret = __memp_fget(mpf, &cp->opd->internal->root,
744                     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
745                         return (ret);
746
747                 /*
748                  * If the page is an internal page use the page's count as it's
749                  * up-to-date and reflects the status of cursors in the tree.
750                  * If the page is a leaf page for unsorted duplicates, use the
751                  * page's count as cursors don't mark items deleted on the page
752                  * and wait, cursor delete items immediately.
753                  * If the page is a leaf page for sorted duplicates, there may
754                  * be cursors on the page marking deleted items -- count.
755                  */
756                 if (TYPE(cp->page) == P_LDUP)
757                         for (recno = 0, indx = 0,
758                             top = NUM_ENT(cp->page) - O_INDX;; indx += O_INDX) {
759                                 if (!IS_DELETED(dbp, cp->page, indx))
760                                         ++recno;
761                                 if (indx == top)
762                                         break;
763                         }
764                 else
765                         recno = RE_NREC(cp->page);
766         }
767
768         *recnop = recno;
769
770         ret = __memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority);
771         cp->page = NULL;
772
773         return (ret);
774 }
775
776 /*
777  * __bamc_del --
778  *      Delete using a cursor.
779  */
780 static int
781 __bamc_del(dbc, flags)
782         DBC *dbc;
783         u_int32_t flags;
784 {
785         BTREE_CURSOR *cp;
786         DB *dbp;
787         DB_MPOOLFILE *mpf;
788         int count, ret, t_ret;
789
790         dbp = dbc->dbp;
791         mpf = dbp->mpf;
792         cp = (BTREE_CURSOR *)dbc->internal;
793         ret = 0;
794         COMPQUIET(flags, 0);
795
796         /* If the item was already deleted, return failure. */
797         if (F_ISSET(cp, C_DELETED))
798                 return (DB_KEYEMPTY);
799
800         /*
801          * This code is always called with a page lock but no page.
802          */
803         DB_ASSERT(dbp->env, cp->page == NULL);
804
805         /*
806          * We don't physically delete the record until the cursor moves, so
807          * we have to have a long-lived write lock on the page instead of a
808          * a long-lived read lock.  Note, we have to have a read lock to even
809          * get here.
810          *
811          * If we're maintaining record numbers, we lock the entire tree, else
812          * we lock the single page.
813          */
814         if (F_ISSET(cp, C_RECNUM)) {
815                 if ((ret = __bamc_getstack(dbc)) != 0)
816                         goto err;
817                 cp->page = cp->csp->page;
818         } else {
819                 ACQUIRE_CUR(dbc, DB_LOCK_WRITE, cp->pgno, 0, ret);
820                 if (ret != 0)
821                         goto err;
822         }
823
824         /* Mark the page dirty. */
825         if ((ret = __memp_dirty(mpf,
826             &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
827                 goto err;
828
829         /* Log the change. */
830         if (DBC_LOGGING(dbc)) {
831                 if ((ret = __bam_cdel_log(dbp, dbc->txn, &LSN(cp->page), 0,
832                     PGNO(cp->page), &LSN(cp->page), cp->indx)) != 0)
833                         goto err;
834         } else
835                 LSN_NOT_LOGGED(LSN(cp->page));
836
837         /* Set the intent-to-delete flag on the page. */
838         if (TYPE(cp->page) == P_LBTREE)
839                 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx + O_INDX)->type);
840         else
841                 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type);
842
843 err:    /*
844          * If we've been successful so far and the tree has record numbers,
845          * adjust the record counts.  Either way, release acquired page(s).
846          */
847         if (F_ISSET(cp, C_RECNUM)) {
848                 cp->csp->page = cp->page;
849                 if (ret == 0)
850                         ret = __bam_adjust(dbc, -1);
851                 (void)__bam_stkrel(dbc, 0);
852         } else
853                 if (cp->page != NULL &&
854                     (t_ret = __memp_fput(mpf, dbc->thread_info,
855                     cp->page, dbc->priority)) != 0 && ret == 0)
856                         ret = t_ret;
857
858         cp->page = NULL;
859
860         /*
861          * Update the cursors last, after all chance of recoverable failure
862          * is past.
863          */
864         if (ret == 0)
865                 ret = __bam_ca_delete(dbp, cp->pgno, cp->indx, 1, &count);
866
867         return (ret);
868 }
869
870 /*
871  * __bamc_dup --
872  *      Duplicate a btree cursor, such that the new one holds appropriate
873  *      locks for the position of the original.
874  *
875  * PUBLIC: int __bamc_dup __P((DBC *, DBC *, u_int32_t));
876  */
877 int
878 __bamc_dup(orig_dbc, new_dbc, flags)
879         DBC *orig_dbc, *new_dbc;
880         u_int32_t flags;
881 {
882         BTREE_CURSOR *orig, *new;
883
884         orig = (BTREE_CURSOR *)orig_dbc->internal;
885         new = (BTREE_CURSOR *)new_dbc->internal;
886
887         new->ovflsize = orig->ovflsize;
888         new->recno = orig->recno;
889         new->flags = orig->flags;
890
891 #ifdef HAVE_COMPRESSION
892         /* Copy the compression state */
893         return (__bamc_compress_dup(orig_dbc, new_dbc, flags));
894 #else
895         COMPQUIET(flags, 0);
896
897         return (0);
898 #endif
899 }
900
901 /*
902  * __bamc_get --
903  *      Get using a cursor (btree).
904  */
905 static int
906 __bamc_get(dbc, key, data, flags, pgnop)
907         DBC *dbc;
908         DBT *key, *data;
909         u_int32_t flags;
910         db_pgno_t *pgnop;
911 {
912         BTREE_CURSOR *cp;
913         DB *dbp;
914         DB_MPOOLFILE *mpf;
915         db_pgno_t orig_pgno;
916         db_indx_t orig_indx;
917         int exact, newopd, ret;
918
919         dbp = dbc->dbp;
920         mpf = dbp->mpf;
921         cp = (BTREE_CURSOR *)dbc->internal;
922         orig_pgno = cp->pgno;
923         orig_indx = cp->indx;
924
925         newopd = 0;
926         switch (flags) {
927         case DB_CURRENT:
928                 /* It's not possible to return a deleted record. */
929                 if (F_ISSET(cp, C_DELETED)) {
930                         ret = DB_KEYEMPTY;
931                         goto err;
932                 }
933
934                 /*
935                  * Acquire the current page.  We have at least a read-lock
936                  * already.  The caller may have set DB_RMW asking for a
937                  * write lock, but upgrading to a write lock has no better
938                  * chance of succeeding now instead of later, so don't try.
939                  */
940                 if ((ret = __memp_fget(mpf, &cp->pgno,
941                     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
942                         goto err;
943                 break;
944         case DB_FIRST:
945                 newopd = 1;
946                 if ((ret = __bamc_search(dbc,
947                      PGNO_INVALID, NULL, flags, &exact)) != 0)
948                         goto err;
949                 break;
950         case DB_GET_BOTH:
951         case DB_GET_BOTH_RANGE:
952                 /*
953                  * There are two ways to get here based on DBcursor->get
954                  * with the DB_GET_BOTH/DB_GET_BOTH_RANGE flags set:
955                  *
956                  * 1. Searching a sorted off-page duplicate tree: do a tree
957                  * search.
958                  *
959                  * 2. Searching btree: do a tree search.  If it returns a
960                  * reference to off-page duplicate tree, return immediately
961                  * and let our caller deal with it.  If the search doesn't
962                  * return a reference to off-page duplicate tree, continue
963                  * with an on-page search.
964                  */
965                 if (F_ISSET(dbc, DBC_OPD)) {
966                         if ((ret = __bamc_search(
967                             dbc, PGNO_INVALID, data, flags, &exact)) != 0)
968                                 goto err;
969                         if (flags == DB_GET_BOTH) {
970                                 if (!exact) {
971                                         ret = DB_NOTFOUND;
972                                         goto err;
973                                 }
974                                 break;
975                         }
976
977                         /*
978                          * We didn't require an exact match, so the search may
979                          * may have returned an entry past the end of the page,
980                          * or we may be referencing a deleted record.  If so,
981                          * move to the next entry.
982                          */
983                         if ((cp->indx == NUM_ENT(cp->page) ||
984                             IS_CUR_DELETED(dbc)) &&
985                             (ret = __bamc_next(dbc, 1, 0)) != 0)
986                                 goto err;
987                 } else {
988                         if ((ret = __bamc_search(
989                             dbc, PGNO_INVALID, key, flags, &exact)) != 0)
990                                 return (ret);
991                         if (!exact) {
992                                 ret = DB_NOTFOUND;
993                                 goto err;
994                         }
995
996                         if (pgnop != NULL && __bam_isopd(dbc, pgnop)) {
997                                 newopd = 1;
998                                 break;
999                         }
1000                         if ((ret =
1001                             __bam_getboth_finddatum(dbc, data, flags)) != 0)
1002                                 goto err;
1003                 }
1004                 break;
1005 #ifdef HAVE_COMPRESSION
1006         case DB_SET_LTE:
1007                 if ((ret = __bam_getlte(dbc, key, NULL)) != 0)
1008                         goto err;
1009                 break;
1010         case DB_GET_BOTH_LTE:
1011                 if ((ret = __bam_getlte(dbc, key, data)) != 0)
1012                         goto err;
1013                 break;
1014 #endif
1015         case DB_GET_BOTHC:
1016                 if ((ret = __bam_getbothc(dbc, data)) != 0)
1017                         goto err;
1018                 break;
1019         case DB_LAST:
1020                 newopd = 1;
1021                 if ((ret = __bamc_search(dbc,
1022                      PGNO_INVALID, NULL, flags, &exact)) != 0)
1023                         goto err;
1024                 break;
1025         case DB_NEXT:
1026                 newopd = 1;
1027                 if (cp->pgno == PGNO_INVALID) {
1028                         if ((ret = __bamc_search(dbc,
1029                              PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0)
1030                                 goto err;
1031                 } else
1032                         if ((ret = __bamc_next(dbc, 1, 0)) != 0)
1033                                 goto err;
1034                 break;
1035         case DB_NEXT_DUP:
1036                 if ((ret = __bamc_next(dbc, 1, 0)) != 0)
1037                         goto err;
1038                 if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
1039                         ret = DB_NOTFOUND;
1040                         goto err;
1041                 }
1042                 break;
1043         case DB_NEXT_NODUP:
1044                 newopd = 1;
1045                 if (cp->pgno == PGNO_INVALID) {
1046                         if ((ret = __bamc_search(dbc,
1047                              PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0)
1048                                 goto err;
1049                 } else
1050                         do {
1051                                 if ((ret = __bamc_next(dbc, 1, 0)) != 0)
1052                                         goto err;
1053                         } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
1054                 break;
1055         case DB_PREV:
1056                 newopd = 1;
1057                 if (cp->pgno == PGNO_INVALID) {
1058                         if ((ret = __bamc_search(dbc,
1059                              PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
1060                                 goto err;
1061                 } else
1062                         if ((ret = __bamc_prev(dbc)) != 0)
1063                                 goto err;
1064                 break;
1065         case DB_PREV_DUP:
1066                 if ((ret = __bamc_prev(dbc)) != 0)
1067                         goto err;
1068                 if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
1069                         ret = DB_NOTFOUND;
1070                         goto err;
1071                 }
1072                 break;
1073         case DB_PREV_NODUP:
1074                 newopd = 1;
1075                 if (cp->pgno == PGNO_INVALID) {
1076                         if ((ret = __bamc_search(dbc,
1077                              PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
1078                                 goto err;
1079                 } else
1080                         do {
1081                                 if ((ret = __bamc_prev(dbc)) != 0)
1082                                         goto err;
1083                         } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
1084                 break;
1085         case DB_SET:
1086         case DB_SET_RECNO:
1087                 newopd = 1;
1088                 if ((ret = __bamc_search(dbc,
1089                     PGNO_INVALID, key, flags, &exact)) != 0)
1090                         goto err;
1091                 break;
1092         case DB_SET_RANGE:
1093                 newopd = 1;
1094                 if ((ret = __bamc_search(dbc,
1095                     PGNO_INVALID, key, flags, &exact)) != 0)
1096                         goto err;
1097
1098                 /*
1099                  * As we didn't require an exact match, the search function
1100                  * may have returned an entry past the end of the page.  Or,
1101                  * we may be referencing a deleted record.  If so, move to
1102                  * the next entry.
1103                  */
1104                 if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc))
1105                         if ((ret = __bamc_next(dbc, 0, 0)) != 0)
1106                                 goto err;
1107                 break;
1108         default:
1109                 ret = __db_unknown_flag(dbp->env, "__bamc_get", flags);
1110                 goto err;
1111         }
1112
1113         /*
1114          * We may have moved to an off-page duplicate tree.  Return that
1115          * information to our caller.
1116          */
1117         if (newopd && pgnop != NULL)
1118                 (void)__bam_isopd(dbc, pgnop);
1119
1120 err:    /*
1121          * Regardless of whether we were successful or not, if the cursor
1122          * moved, clear the delete flag, DBcursor->get never references a
1123          * deleted key, if it moved at all.
1124          */
1125         if (F_ISSET(cp, C_DELETED) &&
1126             (cp->pgno != orig_pgno || cp->indx != orig_indx))
1127                 F_CLR(cp, C_DELETED);
1128
1129         return (ret);
1130 }
1131
1132 static int
1133 __bam_get_prev(dbc)
1134         DBC *dbc;
1135 {
1136         BTREE_CURSOR *cp;
1137         DBT key, data;
1138         db_pgno_t pgno;
1139         int ret;
1140
1141         if ((ret = __bamc_prev(dbc)) != 0)
1142                 return (ret);
1143
1144         if (__bam_isopd(dbc, &pgno)) {
1145                 cp = (BTREE_CURSOR *)dbc->internal;
1146                 if ((ret = __dbc_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
1147                         return (ret);
1148                 if ((ret = cp->opd->am_get(cp->opd,
1149                     &key, &data, DB_LAST, NULL)) != 0)
1150                         return (ret);
1151         }
1152
1153         return (0);
1154 }
1155
1156 /*
1157  * __bam_bulk -- Return bulk data from a btree.
1158  */
1159 static int
1160 __bam_bulk(dbc, data, flags)
1161         DBC *dbc;
1162         DBT *data;
1163         u_int32_t flags;
1164 {
1165         BKEYDATA *bk;
1166         BOVERFLOW *bo;
1167         BTREE_CURSOR *cp;
1168         PAGE *pg;
1169         db_indx_t *inp, indx, pg_keyoff;
1170         int32_t  *endp, key_off, *offp, *saveoffp;
1171         u_int8_t *dbuf, *dp, *np;
1172         u_int32_t key_size, pagesize, size, space;
1173         int adj, is_key, need_pg, next_key, no_dup, rec_key, ret;
1174
1175         ret = 0;
1176         key_off = 0;
1177         size = 0;
1178         pagesize = dbc->dbp->pgsize;
1179         cp = (BTREE_CURSOR *)dbc->internal;
1180
1181         /*
1182          * dp tracks the beginning of the page in the buffer.
1183          * np is the next place to copy things into the buffer.
1184          * dbuf always stays at the beginning of the buffer.
1185          */
1186         dbuf = data->data;
1187         np = dp = dbuf;
1188
1189         /* Keep track of space that is left.  There is a termination entry */
1190         space = data->ulen;
1191         space -= sizeof(*offp);
1192
1193         /* Build the offset/size table from the end up. */
1194         endp = (int32_t *)((u_int8_t *)dbuf + data->ulen);
1195         endp--;
1196         offp = endp;
1197
1198         key_size = 0;
1199
1200         /*
1201          * Distinguish between BTREE and RECNO.
1202          * There are no keys in RECNO.  If MULTIPLE_KEY is specified
1203          * then we return the record numbers.
1204          * is_key indicates that multiple btree keys are returned.
1205          * rec_key is set if we are returning record numbers.
1206          * next_key is set if we are going after the next key rather than dup.
1207          */
1208         if (dbc->dbtype == DB_BTREE) {
1209                 is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1: 0;
1210                 rec_key = 0;
1211                 next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1212                 adj = 2;
1213         } else {
1214                 is_key = 0;
1215                 rec_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1216                 next_key = LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1217                 adj = 1;
1218         }
1219         no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP;
1220
1221 next_pg:
1222         indx = cp->indx;
1223         pg = cp->page;
1224
1225         inp = P_INP(dbc->dbp, pg);
1226         /* The current page is not yet in the buffer. */
1227         need_pg = 1;
1228
1229         /*
1230          * Keep track of the offset of the current key on the page.
1231          * If we are returning keys, set it to 0 first so we force
1232          * the copy of the key to the buffer.
1233          */
1234         pg_keyoff = 0;
1235         if (is_key == 0)
1236                 pg_keyoff = inp[indx];
1237
1238         do {
1239                 if (IS_DELETED(dbc->dbp, pg, indx)) {
1240                         if (dbc->dbtype != DB_RECNO)
1241                                 continue;
1242
1243                         cp->recno++;
1244                         /*
1245                          * If we are not returning recnos then we
1246                          * need to fill in every slot so the user
1247                          * can calculate the record numbers.
1248                          */
1249                         if (rec_key != 0)
1250                                 continue;
1251
1252                         space -= 2 * sizeof(*offp);
1253                         /* Check if space as underflowed. */
1254                         if (space > data->ulen)
1255                                 goto back_up;
1256
1257                         /* Just mark the empty recno slots. */
1258                         *offp-- = 0;
1259                         *offp-- = 0;
1260                         continue;
1261                 }
1262
1263                 /*
1264                  * Check to see if we have a new key.
1265                  * If so, then see if we need to put the
1266                  * key on the page.  If its already there
1267                  * then we just point to it.
1268                  */
1269                 if (is_key && pg_keyoff != inp[indx]) {
1270                         bk = GET_BKEYDATA(dbc->dbp, pg, indx);
1271                         if (B_TYPE(bk->type) == B_OVERFLOW) {
1272                                 bo = (BOVERFLOW *)bk;
1273                                 size = key_size = bo->tlen;
1274                                 if (key_size > space)
1275                                         goto get_key_space;
1276                                 if ((ret = __bam_bulk_overflow(dbc,
1277                                     bo->tlen, bo->pgno, np)) != 0)
1278                                         return (ret);
1279                                 space -= key_size;
1280                                 key_off = (int32_t)(np - dbuf);
1281                                 np += key_size;
1282                         } else {
1283                                 if (need_pg) {
1284                                         dp = np;
1285                                         size = pagesize - HOFFSET(pg);
1286                                         if (space < size) {
1287 get_key_space:
1288                                                 /* Nothing added, then error. */
1289                                                 if (offp == endp) {
1290                                                         data->size = (u_int32_t)
1291                                                             DB_ALIGN(size +
1292                                                             pagesize, 1024);
1293                                                         return
1294                                                             (DB_BUFFER_SMALL);
1295                                                 }
1296                                                 /*
1297                                                  * We need to back up to the
1298                                                  * last record put into the
1299                                                  * buffer so that it is
1300                                                  * CURRENT.
1301                                                  */
1302                                                 if (indx != 0)
1303                                                         indx -= P_INDX;
1304                                                 else {
1305                                                         if ((ret =
1306                                                             __bam_get_prev(
1307                                                             dbc)) != 0)
1308                                                                 return (ret);
1309                                                         indx = cp->indx;
1310                                                         pg = cp->page;
1311                                                 }
1312                                                 break;
1313                                         }
1314                                         /*
1315                                          * Move the data part of the page
1316                                          * to the buffer.
1317                                          */
1318                                         memcpy(dp,
1319                                            (u_int8_t *)pg + HOFFSET(pg), size);
1320                                         need_pg = 0;
1321                                         space -= size;
1322                                         np += size;
1323                                 }
1324                                 key_size = bk->len;
1325                                 key_off = (int32_t)((inp[indx] - HOFFSET(pg))
1326                                     + (dp - dbuf) + SSZA(BKEYDATA, data));
1327                                 pg_keyoff = inp[indx];
1328                         }
1329                 }
1330
1331                 /*
1332                  * Reserve space for the pointers and sizes.
1333                  * Either key/data pair or just for a data item.
1334                  */
1335                 space -= (is_key ? 4 : 2) * sizeof(*offp);
1336                 if (rec_key)
1337                         space -= sizeof(*offp);
1338
1339                 /* Check to see if space has underflowed. */
1340                 if (space > data->ulen)
1341                         goto back_up;
1342
1343                 /*
1344                  * Determine if the next record is in the
1345                  * buffer already or if it needs to be copied in.
1346                  * If we have an off page dup, then copy as many
1347                  * as will fit into the buffer.
1348                  */
1349                 bk = GET_BKEYDATA(dbc->dbp, pg, indx + adj - 1);
1350                 if (B_TYPE(bk->type) == B_DUPLICATE) {
1351                         bo = (BOVERFLOW *)bk;
1352                         if (is_key) {
1353                                 *offp-- = (int32_t)key_off;
1354                                 *offp-- = (int32_t)key_size;
1355                         }
1356                         /*
1357                          * We pass the offset of the current key.
1358                          * On return we check to see if offp has
1359                          * moved to see if any data fit.
1360                          */
1361                         saveoffp = offp;
1362                         if ((ret = __bam_bulk_duplicates(dbc, bo->pgno,
1363                             dbuf, is_key ? offp + P_INDX : NULL,
1364                             &offp, &np, &space, no_dup)) != 0) {
1365                                 if (ret == DB_BUFFER_SMALL) {
1366                                         size = space;
1367                                         space = 0;
1368                                         /* If nothing was added, then error. */
1369                                         if (offp == saveoffp) {
1370                                                 offp += 2;
1371                                                 goto back_up;
1372                                         }
1373                                         goto get_space;
1374                                 }
1375                                 return (ret);
1376                         }
1377                 } else if (B_TYPE(bk->type) == B_OVERFLOW) {
1378                         bo = (BOVERFLOW *)bk;
1379                         size = bo->tlen;
1380                         if (size > space)
1381                                 goto back_up;
1382                         if ((ret =
1383                             __bam_bulk_overflow(dbc,
1384                                 bo->tlen, bo->pgno, np)) != 0)
1385                                 return (ret);
1386                         space -= size;
1387                         if (is_key) {
1388                                 *offp-- = (int32_t)key_off;
1389                                 *offp-- = (int32_t)key_size;
1390                         } else if (rec_key)
1391                                 *offp-- = (int32_t)cp->recno;
1392                         *offp-- = (int32_t)(np - dbuf);
1393                         np += size;
1394                         *offp-- = (int32_t)size;
1395                 } else {
1396                         if (need_pg) {
1397                                 dp = np;
1398                                 size = pagesize - HOFFSET(pg);
1399                                 if (space < size) {
1400 back_up:
1401                                         /*
1402                                          * Back up the index so that the
1403                                          * last record in the buffer is CURRENT
1404                                          */
1405                                         if (indx >= adj)
1406                                                 indx -= adj;
1407                                         else {
1408                                                 if ((ret =
1409                                                     __bam_get_prev(dbc)) != 0 &&
1410                                                     ret != DB_NOTFOUND)
1411                                                         return (ret);
1412                                                 indx = cp->indx;
1413                                                 pg = cp->page;
1414                                         }
1415                                         if (dbc->dbtype == DB_RECNO)
1416                                                 cp->recno--;
1417 get_space:
1418                                         /*
1419                                          * See if we put anything in the
1420                                          * buffer or if we are doing a DBP->get
1421                                          * did we get all of the data.
1422                                          */
1423                                         if (offp >=
1424                                             (is_key ? &endp[-1] : endp) ||
1425                                             F_ISSET(dbc, DBC_FROM_DB_GET)) {
1426                                                 data->size = (u_int32_t)
1427                                                     DB_ALIGN(size +
1428                                                     data->ulen - space, 1024);
1429                                                 return (DB_BUFFER_SMALL);
1430                                         }
1431                                         break;
1432                                 }
1433                                 memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size);
1434                                 need_pg = 0;
1435                                 space -= size;
1436                                 np += size;
1437                         }
1438                         /*
1439                          * Add the offsets and sizes to the end of the buffer.
1440                          * First add the key info then the data info.
1441                          */
1442                         if (is_key) {
1443                                 *offp-- = (int32_t)key_off;
1444                                 *offp-- = (int32_t)key_size;
1445                         } else if (rec_key)
1446                                 *offp-- = (int32_t)cp->recno;
1447                         *offp-- = (int32_t)((inp[indx + adj - 1] - HOFFSET(pg))
1448                             + (dp - dbuf) + SSZA(BKEYDATA, data));
1449                         *offp-- = bk->len;
1450                 }
1451                 if (dbc->dbtype == DB_RECNO)
1452                         cp->recno++;
1453                 else if (no_dup) {
1454                         while (indx + adj < NUM_ENT(pg) &&
1455                             pg_keyoff == inp[indx + adj])
1456                                 indx += adj;
1457                 }
1458         /*
1459          * Stop when we either run off the page or we move to the next key and
1460          * we are not returning multiple keys.
1461          */
1462         } while ((indx += adj) < NUM_ENT(pg) &&
1463             (next_key || pg_keyoff == inp[indx]));
1464
1465         /* If we are off the page then try to the next page. */
1466         if (ret == 0 && next_key && indx >= NUM_ENT(pg)) {
1467                 cp->indx = indx;
1468                 ret = __bamc_next(dbc, 0, 1);
1469                 if (ret == 0)
1470                         goto next_pg;
1471                 if (ret != DB_NOTFOUND)
1472                         return (ret);
1473         }
1474
1475         /*
1476          * If we did a DBP->get we must error if we did not return
1477          * all the data for the current key because there is
1478          * no way to know if we did not get it all, nor any
1479          * interface to fetch the balance.
1480          */
1481
1482         if (ret == 0 && indx < pg->entries &&
1483             F_ISSET(dbc, DBC_TRANSIENT) && pg_keyoff == inp[indx]) {
1484                 data->size = (data->ulen - space) + size;
1485                 return (DB_BUFFER_SMALL);
1486         }
1487         /*
1488          * Must leave the index pointing at the last record fetched.
1489          * If we are not fetching keys, we may have stepped to the
1490          * next key.
1491          */
1492         if (ret == DB_BUFFER_SMALL || next_key || pg_keyoff == inp[indx])
1493                 cp->indx = indx;
1494         else
1495                 cp->indx = indx - P_INDX;
1496
1497         if (rec_key == 1)
1498                 *offp = RECNO_OOB;
1499         else
1500                 *offp = -1;
1501         return (0);
1502 }
1503
1504 /*
1505  * __bam_bulk_overflow --
1506  *      Dump overflow record into the buffer.
1507  *      The space requirements have already been checked.
1508  * PUBLIC: int __bam_bulk_overflow
1509  * PUBLIC:    __P((DBC *, u_int32_t, db_pgno_t, u_int8_t *));
1510  */
1511 int
1512 __bam_bulk_overflow(dbc, len, pgno, dp)
1513         DBC *dbc;
1514         u_int32_t len;
1515         db_pgno_t pgno;
1516         u_int8_t *dp;
1517 {
1518         DBT dbt;
1519
1520         memset(&dbt, 0, sizeof(dbt));
1521         F_SET(&dbt, DB_DBT_USERMEM);
1522         dbt.ulen = len;
1523         dbt.data = (void *)dp;
1524         return (__db_goff(dbc, &dbt, len, pgno, NULL, NULL));
1525 }
1526
1527 /*
1528  * __bam_bulk_duplicates --
1529  *      Put as many off page duplicates as will fit into the buffer.
1530  * This routine will adjust the cursor to reflect the position in
1531  * the overflow tree.
1532  * PUBLIC: int __bam_bulk_duplicates __P((DBC *,
1533  * PUBLIC:       db_pgno_t, u_int8_t *, int32_t *,
1534  * PUBLIC:       int32_t **, u_int8_t **, u_int32_t *, int));
1535  */
1536 int
1537 __bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup)
1538         DBC *dbc;
1539         db_pgno_t pgno;
1540         u_int8_t *dbuf;
1541         int32_t *keyoff, **offpp;
1542         u_int8_t **dpp;
1543         u_int32_t *spacep;
1544         int no_dup;
1545 {
1546         BKEYDATA *bk;
1547         BOVERFLOW *bo;
1548         BTREE_CURSOR *cp;
1549         DB *dbp;
1550         DBC *opd;
1551         DBT key, data;
1552         PAGE *pg;
1553         db_indx_t indx, *inp;
1554         int32_t *offp;
1555         u_int32_t pagesize, size, space;
1556         u_int8_t *dp, *np;
1557         int first, need_pg, ret, t_ret;
1558
1559         ret = 0;
1560
1561         dbp = dbc->dbp;
1562         cp = (BTREE_CURSOR *)dbc->internal;
1563         opd = cp->opd;
1564
1565         if (opd == NULL) {
1566                 if ((ret = __dbc_newopd(dbc, pgno, NULL, &opd)) != 0)
1567                         return (ret);
1568                 cp->opd = opd;
1569                 if ((ret = opd->am_get(opd,
1570                     &key, &data, DB_FIRST, NULL)) != 0)
1571                         goto close_opd;
1572         }
1573
1574         pagesize = opd->dbp->pgsize;
1575         cp = (BTREE_CURSOR *)opd->internal;
1576         space = *spacep;
1577         /* Get current offset slot. */
1578         offp = *offpp;
1579
1580         /*
1581          * np is the next place to put data.
1582          * dp is the beginning of the current page in the buffer.
1583          */
1584         np = dp = *dpp;
1585         first = 1;
1586         indx = cp->indx;
1587
1588         do {
1589                 /* Fetch the current record.  No initial move. */
1590                 if ((ret = __bamc_next(opd, 0, 0)) != 0)
1591                         break;
1592                 pg = cp->page;
1593                 indx = cp->indx;
1594                 inp = P_INP(dbp, pg);
1595                 /* We need to copy the page to the buffer. */
1596                 need_pg = 1;
1597
1598                 do {
1599                         if (IS_DELETED(dbp, pg, indx))
1600                                 goto contin;
1601                         bk = GET_BKEYDATA(dbp, pg, indx);
1602                         space -= 2 * sizeof(*offp);
1603                         /* Allocate space for key if needed. */
1604                         if (first == 0 && keyoff != NULL)
1605                                 space -= 2 * sizeof(*offp);
1606
1607                         /* Did space underflow? */
1608                         if (space > *spacep) {
1609                                 ret = DB_BUFFER_SMALL;
1610                                 if (first == 1) {
1611                                         /* Get the absolute value. */
1612                                         space = -(int32_t)space;
1613                                         space = *spacep + space;
1614                                         if (need_pg)
1615                                                 space += pagesize - HOFFSET(pg);
1616                                 }
1617                                 break;
1618                         }
1619                         if (B_TYPE(bk->type) == B_OVERFLOW) {
1620                                 bo = (BOVERFLOW *)bk;
1621                                 size = bo->tlen;
1622                                 if (size > space) {
1623                                         ret = DB_BUFFER_SMALL;
1624                                         space = *spacep + size;
1625                                         break;
1626                                 }
1627                                 if (first == 0 && keyoff != NULL) {
1628                                         *offp-- = keyoff[0];
1629                                         *offp-- = keyoff[-1];
1630                                 }
1631                                 if ((ret = __bam_bulk_overflow(dbc,
1632                                     bo->tlen, bo->pgno, np)) != 0)
1633                                         return (ret);
1634                                 space -= size;
1635                                 *offp-- = (int32_t)(np - dbuf);
1636                                 np += size;
1637                         } else {
1638                                 if (need_pg) {
1639                                         dp = np;
1640                                         size = pagesize - HOFFSET(pg);
1641                                         if (space < size) {
1642                                                 ret = DB_BUFFER_SMALL;
1643                                                 /* Return space required. */
1644                                                 space = *spacep + size;
1645                                                 break;
1646                                         }
1647                                         memcpy(dp,
1648                                             (u_int8_t *)pg + HOFFSET(pg), size);
1649                                         need_pg = 0;
1650                                         space -= size;
1651                                         np += size;
1652                                 }
1653                                 if (first == 0 && keyoff != NULL) {
1654                                         *offp-- = keyoff[0];
1655                                         *offp-- = keyoff[-1];
1656                                 }
1657                                 size = bk->len;
1658                                 *offp-- = (int32_t)((inp[indx] - HOFFSET(pg))
1659                                     + (dp - dbuf) + SSZA(BKEYDATA, data));
1660                         }
1661                         *offp-- = (int32_t)size;
1662                         first = 0;
1663                         if (no_dup)
1664                                 break;
1665 contin:
1666                         indx++;
1667                         if (opd->dbtype == DB_RECNO)
1668                                 cp->recno++;
1669                 } while (indx < NUM_ENT(pg));
1670                 if (no_dup)
1671                         break;
1672                 cp->indx = indx;
1673
1674         } while (ret == 0);
1675
1676         /* Return the updated information. */
1677         *spacep = space;
1678         *offpp = offp;
1679         *dpp = np;
1680
1681         /*
1682          * If we ran out of space back up the pointer.
1683          * If we did not return any dups or reached the end, close the opd.
1684          */
1685         if (ret == DB_BUFFER_SMALL) {
1686                 if (opd->dbtype == DB_RECNO) {
1687                         if (--cp->recno == 0)
1688                                 goto close_opd;
1689                 } else if (indx != 0)
1690                         cp->indx--;
1691                 else {
1692                         t_ret = __bamc_prev(opd);
1693                         if (t_ret == DB_NOTFOUND)
1694                                 goto close_opd;
1695                         if (t_ret != 0)
1696                                 ret = t_ret;
1697                 }
1698         } else if (keyoff == NULL && ret == DB_NOTFOUND) {
1699                 cp->indx--;
1700                 if (opd->dbtype == DB_RECNO)
1701                         --cp->recno;
1702         } else if (indx == 0 || ret == DB_NOTFOUND) {
1703 close_opd:
1704                 if (ret == DB_NOTFOUND)
1705                         ret = 0;
1706                 if ((t_ret = __dbc_close(opd)) != 0 && ret == 0)
1707                         ret = t_ret;
1708                 ((BTREE_CURSOR *)dbc->internal)->opd = NULL;
1709         }
1710         if (ret == DB_NOTFOUND)
1711                 ret = 0;
1712
1713         return (ret);
1714 }
1715
1716 /*
1717  * __bam_getbothc --
1718  *      Search for a matching data item on a join.
1719  */
1720 static int
1721 __bam_getbothc(dbc, data)
1722         DBC *dbc;
1723         DBT *data;
1724 {
1725         BTREE_CURSOR *cp;
1726         DB *dbp;
1727         DB_MPOOLFILE *mpf;
1728         int cmp, exact, ret;
1729
1730         dbp = dbc->dbp;
1731         mpf = dbp->mpf;
1732         cp = (BTREE_CURSOR *)dbc->internal;
1733
1734         /*
1735          * Acquire the current page.  We have at least a read-lock
1736          * already.  The caller may have set DB_RMW asking for a
1737          * write lock, but upgrading to a write lock has no better
1738          * chance of succeeding now instead of later, so don't try.
1739          */
1740         if ((ret = __memp_fget(mpf, &cp->pgno,
1741              dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
1742                 return (ret);
1743
1744         /*
1745          * An off-page duplicate cursor.  Search the remaining duplicates
1746          * for one which matches (do a normal btree search, then verify
1747          * that the retrieved record is greater than the original one).
1748          */
1749         if (F_ISSET(dbc, DBC_OPD)) {
1750                 /*
1751                  * Check to make sure the desired item comes strictly after
1752                  * the current position;  if it doesn't, return DB_NOTFOUND.
1753                  */
1754                 if ((ret = __bam_cmp(dbc, data, cp->page, cp->indx,
1755                     dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare,
1756                     &cmp)) != 0)
1757                         return (ret);
1758
1759                 if (cmp <= 0)
1760                         return (DB_NOTFOUND);
1761
1762                 /* Discard the current page, we're going to do a full search. */
1763                 if ((ret = __memp_fput(mpf,
1764                      dbc->thread_info, cp->page, dbc->priority)) != 0)
1765                         return (ret);
1766                 cp->page = NULL;
1767
1768                 return (__bamc_search(dbc,
1769                     PGNO_INVALID, data, DB_GET_BOTH, &exact));
1770         }
1771
1772         /*
1773          * We're doing a DBC->get(DB_GET_BOTHC) and we're already searching
1774          * a set of on-page duplicates (either sorted or unsorted).  Continue
1775          * a linear search from after the current position.
1776          *
1777          * (Note that we could have just finished a "set" of one duplicate,
1778          * i.e. not a duplicate at all, but the following check will always
1779          * return DB_NOTFOUND in this case, which is the desired behavior.)
1780          */
1781         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1782             !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1783                 return (DB_NOTFOUND);
1784         cp->indx += P_INDX;
1785
1786         return (__bam_getboth_finddatum(dbc, data, DB_GET_BOTH));
1787 }
1788
1789 #ifdef HAVE_COMPRESSION
1790 /*
1791  * __bam_getlte --
1792  *      Search for the largest entry <= key/data - used by compression.
1793  *
1794  *      data == NULL indicates the DB_SET_LTE flag
1795  *      data != NULL indicates the DB_GET_BOTH_LTE flag
1796  *
1797  *      Only works for a primary cursor - not an OPD cursor. Handles the
1798  *      OPD manipulation as well - no need to return to the caller to
1799  *      perform more OPD movements.
1800  */
1801 static int
1802 __bam_getlte(dbc, key, data)
1803         DBC *dbc;
1804         DBT *key, *data;
1805 {
1806         BTREE_CURSOR *cp, *ocp;
1807         DB *dbp;
1808         db_pgno_t pgno;
1809         int exact, ret;
1810
1811         dbp = dbc->dbp;
1812         cp = (BTREE_CURSOR *)dbc->internal;
1813
1814         /* Begin by searching for the key */
1815         ret = __bamc_search(dbc, PGNO_INVALID, key, DB_SET_RANGE, &exact);
1816         if (ret == DB_NOTFOUND)
1817                 goto find_last;
1818         if (ret != 0)
1819                 goto end;
1820
1821         if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc)) {
1822                 /*
1823                  * Move to the next entry if we're past the end of the
1824                  * page or on a deleted entry.
1825                  */
1826                 ret = __bamc_next(dbc, 0, 0);
1827                 if (ret == DB_NOTFOUND)
1828                         goto find_last;
1829                 if (ret != 0)
1830                         goto end;
1831
1832                 /* Check if we're still on the correct key */
1833                 if ((ret = __bam_cmp(dbc, key, cp->page, cp->indx,
1834                     ((BTREE*)dbp->bt_internal)->bt_compare, &exact)) != 0)
1835                         goto end;
1836                 exact = (exact == 0);
1837         }
1838
1839         if (exact == 0) {
1840                 ret = __bam_get_prev(dbc);
1841                 goto end;
1842         }
1843
1844         if (__bam_isopd(dbc, &pgno)) {
1845                 /*
1846                  * We want to do unusual things with off-page duplicates, so
1847                  * deal with them here rather than returning to handle them.
1848                  */
1849                 if ((ret = __dbc_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
1850                         goto end;
1851
1852                 /* Search for the correct duplicate */
1853                 ret = __bamc_search(cp->opd, PGNO_INVALID, data,
1854                         data == NULL ? DB_FIRST : DB_SET_RANGE, &exact);
1855                 if (ret == DB_NOTFOUND)
1856                         goto find_last_dup;
1857                 if (ret != 0)
1858                         goto end;
1859
1860                 ocp = (BTREE_CURSOR *)cp->opd->internal;
1861                 if (ocp->indx == NUM_ENT(ocp->page) ||
1862                     IS_CUR_DELETED(cp->opd)) {
1863                         /*
1864                          * Move to the next entry if we're past the end of the
1865                          * page or on a deleted entry.
1866                          */
1867                         ret = __bamc_next(cp->opd, 0, 0);
1868                         if (ret == DB_NOTFOUND)
1869                                 goto find_last_dup;
1870                         if (ret != 0)
1871                                 goto end;
1872
1873                         if (data != NULL) {
1874                                 /* Check if we're still on the correct data */
1875                                 if ((ret = __bam_cmp(
1876                                             dbc, data, ocp->page, ocp->indx,
1877                                             dbp->dup_compare, &exact)) != 0)
1878                                         goto end;
1879                                 exact = (exact == 0);
1880                         } else
1881                                 exact = 1;
1882                 }
1883
1884                 if (exact == 0) {
1885                         /* Move to the previous entry */
1886                         ret = __bamc_prev(cp->opd);
1887                         if (ret == DB_NOTFOUND) {
1888                                 if ((ret = __dbc_close(cp->opd)) != 0)
1889                                         goto end;
1890                                 cp->opd = NULL;
1891                                 ret = __bam_get_prev(dbc);
1892                         }
1893                 }
1894         } else if(data != NULL) {
1895                 /*
1896                  * If we got an exact match with on-page duplicates, we need to
1897                  * search in them.
1898                  */
1899                 ret = __bam_getboth_finddatum(dbc, data, DB_GET_BOTH_RANGE);
1900                 if (ret == DB_NOTFOUND)
1901                         exact = 0;
1902                 else if (ret != 0)
1903                         goto end;
1904                 else {
1905                         /* Check if we're still on the correct data */
1906                         if ((ret = __bam_cmp(dbc, data, cp->page,
1907                             cp->indx + O_INDX, dbp->dup_compare, &exact)) != 0)
1908                                 goto end;
1909                         exact = (exact == 0);
1910                 }
1911
1912                 if (exact == 0) {
1913                         ret = __bam_get_prev(dbc);
1914                 }
1915         }
1916
1917  end:
1918         return (ret);
1919
1920  find_last:
1921         if ((ret = __bamc_search(
1922             dbc, PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
1923                 return (ret);
1924
1925         if (__bam_isopd(dbc, &pgno)) {
1926                 if ((ret = __dbc_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
1927                         return (ret);
1928  find_last_dup:
1929                 if ((ret = __bamc_search(
1930                     cp->opd, PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
1931                         return (ret);
1932         }
1933
1934         return (ret);
1935 }
1936 #endif
1937
1938 /*
1939  * __bam_getboth_finddatum --
1940  *      Find a matching on-page data item.
1941  */
1942 static int
1943 __bam_getboth_finddatum(dbc, data, flags)
1944         DBC *dbc;
1945         DBT *data;
1946         u_int32_t flags;
1947 {
1948         BTREE_CURSOR *cp;
1949         DB *dbp;
1950         db_indx_t base, lim, top;
1951         int cmp, ret;
1952
1953         COMPQUIET(cmp, 0);
1954
1955         dbp = dbc->dbp;
1956         cp = (BTREE_CURSOR *)dbc->internal;
1957
1958         /*
1959          * Called (sometimes indirectly) from DBC->get to search on-page data
1960          * item(s) for a matching value.  If the original flag was DB_GET_BOTH
1961          * or DB_GET_BOTH_RANGE, the cursor is set to the first undeleted data
1962          * item for the key.  If the original flag was DB_GET_BOTHC, the cursor
1963          * argument is set to the first data item we can potentially return.
1964          * In both cases, there may or may not be additional duplicate data
1965          * items to search.
1966          *
1967          * If the duplicates are not sorted, do a linear search.
1968          */
1969         if (dbp->dup_compare == NULL) {
1970                 for (;; cp->indx += P_INDX) {
1971                         if (!IS_CUR_DELETED(dbc) &&
1972                             (ret = __bam_cmp(dbc, data, cp->page,
1973                             cp->indx + O_INDX, __bam_defcmp, &cmp)) != 0)
1974                                 return (ret);
1975                         if (cmp == 0)
1976                                 return (0);
1977
1978                         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1979                             !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1980                                 break;
1981                 }
1982                 return (DB_NOTFOUND);
1983         }
1984
1985         /*
1986          * If the duplicates are sorted, do a binary search.  The reason for
1987          * this is that large pages and small key/data pairs result in large
1988          * numbers of on-page duplicates before they get pushed off-page.
1989          *
1990          * Find the top and bottom of the duplicate set.  Binary search
1991          * requires at least two items, don't loop if there's only one.
1992          */
1993         for (base = top = cp->indx; top < NUM_ENT(cp->page); top += P_INDX)
1994                 if (!IS_DUPLICATE(dbc, cp->indx, top))
1995                         break;
1996         if (base == (top - P_INDX)) {
1997                 if  ((ret = __bam_cmp(dbc, data, cp->page,
1998                     cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1999                         return (ret);
2000                 if (cmp == 0 || (cmp < 0 && flags == DB_GET_BOTH_RANGE))
2001                         return 0;
2002                 cp->indx = top;
2003                 return DB_NOTFOUND;
2004         }
2005
2006         for (lim = (top - base) / (db_indx_t)P_INDX; lim != 0; lim >>= 1) {
2007                 cp->indx = base + ((lim >> 1) * P_INDX);
2008                 if ((ret = __bam_cmp(dbc, data, cp->page,
2009                     cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
2010                         return (ret);
2011                 if (cmp == 0) {
2012                         /*
2013                          * XXX
2014                          * No duplicate duplicates in sorted duplicate sets,
2015                          * so there can be only one.
2016                          */
2017                         if (!IS_CUR_DELETED(dbc))
2018                                 return (0);
2019                         break;
2020                 }
2021                 if (cmp > 0) {
2022                         base = cp->indx + P_INDX;
2023                         --lim;
2024                 }
2025         }
2026
2027         /* No match found; if we're looking for an exact match, we're done. */
2028         if (flags == DB_GET_BOTH)
2029                 return (DB_NOTFOUND);
2030
2031         /*
2032          * Base is the smallest index greater than the data item, may be zero
2033          * or a last + O_INDX index, and may be deleted.  Find an undeleted
2034          * item.
2035          */
2036         cp->indx = base;
2037         while (cp->indx < top && IS_CUR_DELETED(dbc))
2038                 cp->indx += P_INDX;
2039         return (cp->indx < top ? 0 : DB_NOTFOUND);
2040 }
2041
2042 /*
2043  * __bamc_put --
2044  *      Put using a cursor.
2045  */
2046 static int
2047 __bamc_put(dbc, key, data, flags, pgnop)
2048         DBC *dbc;
2049         DBT *key, *data;
2050         u_int32_t flags;
2051         db_pgno_t *pgnop;
2052 {
2053         BTREE *t;
2054         BTREE_CURSOR *cp;
2055         DB *dbp;
2056         DBT dbt;
2057         DB_MPOOLFILE *mpf;
2058         db_pgno_t root_pgno;
2059         int cmp, exact, own, ret, stack;
2060         u_int32_t iiop;
2061         void *arg;
2062
2063         dbp = dbc->dbp;
2064         mpf = dbp->mpf;
2065         cp = (BTREE_CURSOR *)dbc->internal;
2066         root_pgno = cp->root;
2067
2068 split:  ret = stack = 0;
2069         switch (flags) {
2070         case DB_CURRENT:
2071                 if (F_ISSET(cp, C_DELETED))
2072                         return (DB_NOTFOUND);
2073                 /* FALLTHROUGH */
2074         case DB_AFTER:
2075         case DB_BEFORE:
2076                 iiop = flags;
2077                 own = 1;
2078
2079                 /* Acquire the current page with a write lock. */
2080                 ACQUIRE_WRITE_LOCK(dbc, ret);
2081                 if (ret != 0)
2082                         goto err;
2083                 if (cp->page == NULL && (ret = __memp_fget(mpf, &cp->pgno,
2084                     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
2085                         goto err;
2086                 break;
2087         case DB_KEYFIRST:
2088         case DB_KEYLAST:
2089         case DB_NODUPDATA:
2090         case DB_NOOVERWRITE:
2091         case DB_OVERWRITE_DUP:
2092                 own = 0;
2093                 /*
2094                  * Searching off-page, sorted duplicate tree: do a tree search
2095                  * for the correct item; __bamc_search returns the smallest
2096                  * slot greater than the key, use it.
2097                  *
2098                  * See comment below regarding where we can start the search.
2099                  */
2100                 if (F_ISSET(dbc, DBC_OPD)) {
2101                         if ((ret = __bamc_search(dbc,
2102                             F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno,
2103                             data, flags, &exact)) != 0)
2104                                 goto err;
2105                         stack = 1;
2106
2107                         /* Disallow "sorted" duplicate duplicates. */
2108                         if (exact != 0) {
2109                                 if (flags == DB_OVERWRITE_DUP ||
2110                                     IS_DELETED(dbp, cp->page, cp->indx)) {
2111                                         iiop = DB_CURRENT;
2112                                         break;
2113                                 }
2114                                 ret = __db_duperr(dbp, flags);
2115                                 goto err;
2116                         }
2117                         iiop = DB_BEFORE;
2118                         break;
2119                 }
2120
2121                 /*
2122                  * Searching a btree.
2123                  *
2124                  * If we've done a split, we can start the search from the
2125                  * parent of the split page, which __bam_split returned
2126                  * for us in root_pgno, unless we're in a Btree with record
2127                  * numbering.  In that case, we'll need the true root page
2128                  * in order to adjust the record count.
2129                  */
2130                 if ((ret = __bamc_search(dbc,
2131                     F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno, key,
2132                     flags == DB_KEYFIRST || dbp->dup_compare != NULL ?
2133                     DB_KEYFIRST : DB_KEYLAST, &exact)) != 0)
2134                         goto err;
2135                 stack = 1;
2136
2137                 /*
2138                  * If we don't have an exact match, __bamc_search returned
2139                  * the smallest slot greater than the key, use it.
2140                  */
2141                 if (!exact) {
2142                         iiop = DB_KEYFIRST;
2143                         break;
2144
2145                 /*
2146                  * Check for NOOVERWRITE.  It is possible that there
2147                  * is a key with an empty duplicate page attached.
2148                  */
2149                 } else if (flags == DB_NOOVERWRITE && !IS_CUR_DELETED(dbc)) {
2150                         if (pgnop != NULL && __bam_isopd(dbc, pgnop))
2151                                 ret = __bam_opd_exists(dbc, *pgnop);
2152                         else
2153                                 ret = DB_KEYEXIST;
2154                         if (ret != 0)
2155                                 goto err;
2156                 }
2157
2158                 /*
2159                  * If duplicates aren't supported, replace the current item.
2160                  */
2161                 if (!F_ISSET(dbp, DB_AM_DUP)) {
2162                         iiop = DB_CURRENT;
2163                         break;
2164                 }
2165
2166                 /*
2167                  * If we find a matching entry, it may be an off-page duplicate
2168                  * tree.  Return the page number to our caller, we need a new
2169                  * cursor.
2170                  */
2171                 if (pgnop != NULL && __bam_isopd(dbc, pgnop))
2172                         goto done;
2173
2174                 /* If the duplicates aren't sorted, move to the right slot. */
2175                 if (dbp->dup_compare == NULL) {
2176                         if (flags == DB_KEYFIRST)
2177                                 iiop = DB_BEFORE;
2178                         else
2179                                 for (;; cp->indx += P_INDX)
2180                                         if (cp->indx + P_INDX >=
2181                                             NUM_ENT(cp->page) ||
2182                                             !IS_DUPLICATE(dbc, cp->indx,
2183                                             cp->indx + P_INDX)) {
2184                                                 iiop = DB_AFTER;
2185                                                 break;
2186                                         }
2187                         break;
2188                 }
2189
2190                 /*
2191                  * We know that we're looking at the first of a set of sorted
2192                  * on-page duplicates.  Walk the list to find the right slot.
2193                  */
2194                 for (;; cp->indx += P_INDX) {
2195                         if ((ret = __bam_cmp(dbc, data, cp->page,
2196                             cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
2197                                 goto err;
2198                         if (cmp < 0) {
2199                                 iiop = DB_BEFORE;
2200                                 break;
2201                         }
2202
2203                         /* Disallow "sorted" duplicate duplicates. */
2204                         if (cmp == 0) {
2205                                 if (flags == DB_OVERWRITE_DUP ||
2206                                     IS_DELETED(dbp, cp->page, cp->indx)) {
2207                                         iiop = DB_CURRENT;
2208                                         break;
2209                                 }
2210                                 ret = __db_duperr(dbp, flags);
2211                                 goto err;
2212                         }
2213
2214                         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
2215                             P_INP(dbp, ((PAGE *)cp->page))[cp->indx] !=
2216                             P_INP(dbp, ((PAGE *)cp->page))[cp->indx + P_INDX]) {
2217                                 iiop = DB_AFTER;
2218                                 break;
2219                         }
2220                 }
2221                 break;
2222         default:
2223                 ret = __db_unknown_flag(dbp->env, "__bamc_put", flags);
2224                 goto err;
2225         }
2226
2227         switch (ret = __bam_iitem(dbc, key, data, iiop, 0)) {
2228         case 0:
2229                 break;
2230         case DB_NEEDSPLIT:
2231                 /*
2232                  * To split, we need a key for the page.  Either use the key
2233                  * argument or get a copy of the key from the page.
2234                  */
2235                 if (flags == DB_AFTER ||
2236                     flags == DB_BEFORE || flags == DB_CURRENT) {
2237                         memset(&dbt, 0, sizeof(DBT));
2238                         if ((ret = __db_ret(dbc, cp->page, 0, &dbt,
2239                             &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2240                                 goto err;
2241                         arg = &dbt;
2242                 } else
2243                         arg = F_ISSET(dbc, DBC_OPD) ? data : key;
2244
2245                 /*
2246                  * Discard any locks and pinned pages (the locks are discarded
2247                  * even if we're running with transactions, as they lock pages
2248                  * that we're sorry we ever acquired).  If stack is set and the
2249                  * cursor entries are valid, they point to the same entries as
2250                  * the stack, don't free them twice.
2251                  */
2252                 if (stack)
2253                         ret = __bam_stkrel(dbc, STK_CLRDBC | STK_NOLOCK);
2254                 else
2255                         DISCARD_CUR(dbc, ret);
2256                 if (ret != 0)
2257                         goto err;
2258
2259                 /*
2260                  * SR [#6059]
2261                  * If we do not own a lock on the page any more, then clear the
2262                  * cursor so we don't point at it.  Even if we call __bam_stkrel
2263                  * above we still may have entered the routine with the cursor
2264                  * positioned to a particular record.  This is in the case
2265                  * where C_RECNUM is set.
2266                  */
2267                 if (own == 0) {
2268                         cp->pgno = PGNO_INVALID;
2269                         cp->indx = 0;
2270                 }
2271
2272                 /* Split the tree. */
2273                 if ((ret = __bam_split(dbc, arg, &root_pgno)) != 0)
2274                         return (ret);
2275
2276                 goto split;
2277         default:
2278                 goto err;
2279         }
2280
2281 err:
2282 done:   /*
2283          * If we inserted a key into the first or last slot of the tree,
2284          * remember where it was so we can do it more quickly next time.
2285          * If the tree has record numbers, we need a complete stack so
2286          * that we can adjust the record counts, so skipping the tree search
2287          * isn't possible.  For subdatabases we need to be careful that the
2288          * page does not move from one db to another, so we track its LSN.
2289          *
2290          * If there are duplicates and we are inserting into the last slot,
2291          * the cursor will point _to_ the last item, not after it, which
2292          * is why we subtract P_INDX below.
2293          */
2294
2295         t = dbp->bt_internal;
2296         if (ret == 0 && TYPE(cp->page) == P_LBTREE &&
2297             (flags == DB_KEYFIRST || flags == DB_KEYLAST) &&
2298             !F_ISSET(cp, C_RECNUM) &&
2299             (!F_ISSET(dbp, DB_AM_SUBDB) ||
2300             (LOGGING_ON(dbp->env) && !F_ISSET(dbp, DB_AM_NOT_DURABLE))) &&
2301             ((NEXT_PGNO(cp->page) == PGNO_INVALID &&
2302             cp->indx >= NUM_ENT(cp->page) - P_INDX) ||
2303             (PREV_PGNO(cp->page) == PGNO_INVALID && cp->indx == 0))) {
2304                 t->bt_lpgno = cp->pgno;
2305                 if (F_ISSET(dbp, DB_AM_SUBDB))
2306                         t->bt_llsn = LSN(cp->page);
2307         } else
2308                 t->bt_lpgno = PGNO_INVALID;
2309         /*
2310          * Discard any pages pinned in the tree and their locks, except for
2311          * the leaf page.  Note, the leaf page participated in any stack we
2312          * acquired, and so we have to adjust the stack as necessary.  If
2313          * there was only a single page on the stack, we don't have to free
2314          * further stack pages.
2315          */
2316         if (stack && BT_STK_POP(cp) != NULL)
2317                 (void)__bam_stkrel(dbc, 0);
2318
2319         /*
2320          * Regardless of whether we were successful or not, clear the delete
2321          * flag.  If we're successful, we either moved the cursor or the item
2322          * is no longer deleted.  If we're not successful, then we're just a
2323          * copy, no need to have the flag set.
2324          *
2325          * We may have instantiated off-page duplicate cursors during the put,
2326          * so clear the deleted bit from the off-page duplicate cursor as well.
2327          */
2328         F_CLR(cp, C_DELETED);
2329         if (cp->opd != NULL) {
2330                 cp = (BTREE_CURSOR *)cp->opd->internal;
2331                 F_CLR(cp, C_DELETED);
2332         }
2333
2334         return (ret);
2335 }
2336
2337 /*
2338  * __bamc_rget --
2339  *      Return the record number for a cursor.
2340  *
2341  * PUBLIC: int __bamc_rget __P((DBC *, DBT *));
2342  */
2343 int
2344 __bamc_rget(dbc, data)
2345         DBC *dbc;
2346         DBT *data;
2347 {
2348         BTREE_CURSOR *cp;
2349         DB *dbp;
2350         DBT dbt;
2351         DB_MPOOLFILE *mpf;
2352         db_recno_t recno;
2353         int exact, ret, t_ret;
2354
2355         dbp = dbc->dbp;
2356         mpf = dbp->mpf;
2357         cp = (BTREE_CURSOR *)dbc->internal;
2358
2359         /*
2360          * Get the page with the current item on it.
2361          * Get a copy of the key.
2362          * Release the page, making sure we don't release it twice.
2363          */
2364         if ((ret = __memp_fget(mpf, &cp->pgno,
2365              dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
2366                 return (ret);
2367         memset(&dbt, 0, sizeof(DBT));
2368         if ((ret = __db_ret(dbc, cp->page, cp->indx, &dbt,
2369             &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2370                 goto err;
2371         ret = __memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority);
2372         cp->page = NULL;
2373         if (ret != 0)
2374                 return (ret);
2375
2376         if ((ret = __bam_search(dbc, PGNO_INVALID, &dbt,
2377             F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND,
2378             1, &recno, &exact)) != 0)
2379                 goto err;
2380
2381         ret = __db_retcopy(dbc->env, data,
2382             &recno, sizeof(recno), &dbc->rdata->data, &dbc->rdata->ulen);
2383
2384         /* Release the stack. */
2385 err:    if ((t_ret = __bam_stkrel(dbc, 0)) != 0 && ret == 0)
2386                 ret = t_ret;
2387
2388         return (ret);
2389 }
2390
2391 /*
2392  * __bamc_writelock --
2393  *      Upgrade the cursor to a write lock.
2394  */
2395 static int
2396 __bamc_writelock(dbc)
2397         DBC *dbc;
2398 {
2399         BTREE_CURSOR *cp;
2400         int ret;
2401
2402         cp = (BTREE_CURSOR *)dbc->internal;
2403
2404         if (cp->lock_mode == DB_LOCK_WRITE)
2405                 return (0);
2406
2407         /*
2408          * When writing to an off-page duplicate tree, we need to have the
2409          * appropriate page in the primary tree locked.  The general DBC
2410          * code calls us first with the primary cursor so we can acquire the
2411          * appropriate lock.
2412          */
2413         ACQUIRE_WRITE_LOCK(dbc, ret);
2414         return (ret);
2415 }
2416
2417 /*
2418  * __bamc_next --
2419  *      Move to the next record.
2420  */
2421 static int
2422 __bamc_next(dbc, initial_move, deleted_okay)
2423         DBC *dbc;
2424         int initial_move, deleted_okay;
2425 {
2426         BTREE_CURSOR *cp;
2427         db_indx_t adjust;
2428         db_lockmode_t lock_mode;
2429         db_pgno_t pgno;
2430         int ret;
2431
2432         cp = (BTREE_CURSOR *)dbc->internal;
2433         ret = 0;
2434
2435         /*
2436          * We're either moving through a page of duplicates or a btree leaf
2437          * page.
2438          *
2439          * !!!
2440          * This code handles empty pages and pages with only deleted entries.
2441          */
2442         if (F_ISSET(dbc, DBC_OPD)) {
2443                 adjust = O_INDX;
2444                 lock_mode = DB_LOCK_NG;
2445         } else {
2446                 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2447                 lock_mode =
2448                     F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2449         }
2450         if (cp->page == NULL) {
2451                 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, 0, ret);
2452                 if (ret != 0)
2453                         return (ret);
2454         }
2455
2456         if (initial_move)
2457                 cp->indx += adjust;
2458
2459         for (;;) {
2460                 /*
2461                  * If at the end of the page, move to a subsequent page.
2462                  *
2463                  * !!!
2464                  * Check for >= NUM_ENT.  If the original search landed us on
2465                  * NUM_ENT, we may have incremented indx before the test.
2466                  */
2467                 if (cp->indx >= NUM_ENT(cp->page)) {
2468                         if ((pgno = NEXT_PGNO(cp->page)) == PGNO_INVALID)
2469                                 return (DB_NOTFOUND);
2470
2471                         ACQUIRE_CUR(dbc, lock_mode, pgno, 0, ret);
2472                         if (ret != 0)
2473                                 return (ret);
2474                         cp->indx = 0;
2475                         continue;
2476                 }
2477                 if (!deleted_okay && IS_CUR_DELETED(dbc)) {
2478                         cp->indx += adjust;
2479                         continue;
2480                 }
2481                 break;
2482         }
2483         return (0);
2484 }
2485
2486 /*
2487  * __bamc_prev --
2488  *      Move to the previous record.
2489  */
2490 static int
2491 __bamc_prev(dbc)
2492         DBC *dbc;
2493 {
2494         BTREE_CURSOR *cp;
2495         db_indx_t adjust;
2496         db_lockmode_t lock_mode;
2497         db_pgno_t pgno;
2498         int ret;
2499
2500         cp = (BTREE_CURSOR *)dbc->internal;
2501         ret = 0;
2502
2503         /*
2504          * We're either moving through a page of duplicates or a btree leaf
2505          * page.
2506          *
2507          * !!!
2508          * This code handles empty pages and pages with only deleted entries.
2509          */
2510         if (F_ISSET(dbc, DBC_OPD)) {
2511                 adjust = O_INDX;
2512                 lock_mode = DB_LOCK_NG;
2513         } else {
2514                 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2515                 lock_mode =
2516                     F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2517         }
2518         if (cp->page == NULL) {
2519                 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, 0, ret);
2520                 if (ret != 0)
2521                         return (ret);
2522         }
2523
2524         for (;;) {
2525                 /* If at the beginning of the page, move to a previous one. */
2526                 if (cp->indx == 0) {
2527                         if ((pgno =
2528                             PREV_PGNO(cp->page)) == PGNO_INVALID)
2529                                 return (DB_NOTFOUND);
2530
2531                         ACQUIRE_CUR(dbc, lock_mode, pgno, 0, ret);
2532                         if (ret != 0)
2533                                 return (ret);
2534
2535                         if ((cp->indx = NUM_ENT(cp->page)) == 0)
2536                                 continue;
2537                 }
2538
2539                 /* Ignore deleted records. */
2540                 cp->indx -= adjust;
2541                 if (IS_CUR_DELETED(dbc))
2542                         continue;
2543
2544                 break;
2545         }
2546         return (0);
2547 }
2548
2549 /*
2550  * __bamc_search --
2551  *      Move to a specified record.
2552  */
2553 static int
2554 __bamc_search(dbc, root_pgno, key, flags, exactp)
2555         DBC *dbc;
2556         db_pgno_t root_pgno;
2557         const DBT *key;
2558         u_int32_t flags;
2559         int *exactp;
2560 {
2561         BTREE *t;
2562         BTREE_CURSOR *cp;
2563         DB *dbp;
2564         PAGE *h;
2565         db_indx_t base, indx, *inp, lim;
2566         db_pgno_t bt_lpgno;
2567         db_recno_t recno;
2568         u_int32_t sflags;
2569         int bulk, cmp, ret, t_ret;
2570
2571         COMPQUIET(cmp, 0);
2572         dbp = dbc->dbp;
2573         cp = (BTREE_CURSOR *)dbc->internal;
2574         t = dbp->bt_internal;
2575         ret = 0;
2576         bulk = (F_ISSET(dbc, DBC_BULK) && cp->pgno != PGNO_INVALID);
2577
2578         /*
2579          * Find an entry in the database.  Discard any lock we currently hold,
2580          * we're going to search the tree.
2581          */
2582         DISCARD_CUR(dbc, ret);
2583         if (ret != 0)
2584                 return (ret);
2585
2586         switch (flags) {
2587         case DB_FIRST:
2588                 sflags = (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_MIN;
2589                 goto search;
2590         case DB_LAST:
2591                 sflags = (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_MAX;
2592                 goto search;
2593         case DB_SET_RECNO:
2594                 if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0)
2595                         return (ret);
2596                 sflags =
2597                     (F_ISSET(dbc, DBC_RMW) ?  SR_FIND_WR : SR_FIND) | SR_EXACT;
2598                 if ((ret = __bam_rsearch(dbc, &recno, sflags, 1, exactp)) != 0)
2599                         return (ret);
2600                 goto done;
2601         case DB_SET:
2602         case DB_GET_BOTH:
2603                 sflags =
2604                     (F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND) | SR_EXACT;
2605                 if (bulk)
2606                         break;
2607                 goto search;
2608         case DB_GET_BOTH_RANGE:
2609                 sflags = (F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND);
2610                 goto search;
2611         case DB_SET_RANGE:
2612                 sflags =
2613                     (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_DUPFIRST;
2614                 goto search;
2615         case DB_KEYFIRST:
2616         case DB_NOOVERWRITE:
2617                 sflags = SR_KEYFIRST;
2618                 break;
2619         case DB_KEYLAST:
2620         case DB_NODUPDATA:
2621         case DB_OVERWRITE_DUP:
2622                 sflags = SR_KEYLAST;
2623                 break;
2624         default:
2625                 return (__db_unknown_flag(dbp->env, "__bamc_search", flags));
2626         }
2627
2628         /*
2629          * If the application has a history of inserting into the first or last
2630          * pages of the database, we check those pages first to avoid doing a
2631          * full search.  Similarly, if the cursor is configured as a bulk
2632          * cursor, check whether this operation belongs on the same page as the
2633          * last one.
2634          */
2635         if (bulk)
2636                 bt_lpgno = cp->pgno;
2637         else {
2638                 if (F_ISSET(dbc, DBC_OPD))
2639                         goto search;
2640
2641                 /*
2642                  * !!!
2643                  * We do not mutex protect the t->bt_lpgno field, which means
2644                  * that it can only be used in an advisory manner.  If we find
2645                  * page we can use, great.  If we don't, we don't care, we do
2646                  * it the slow way instead.  Regardless, copy it into a local
2647                  * variable, otherwise we might acquire a lock for a page and
2648                  * then read a different page because it changed underfoot.
2649                  */
2650                 bt_lpgno = t->bt_lpgno;
2651         }
2652
2653         /*
2654          * If the tree has no history of insertion, do it the slow way.
2655          */
2656         if (bt_lpgno == PGNO_INVALID)
2657                 goto search;
2658
2659         /*
2660          * Lock and retrieve the page on which we last inserted.
2661          *
2662          * The page may not exist: if a transaction created the page
2663          * and then aborted, the page might have been truncated from
2664          * the end of the file.  We don't want to wait on the lock.
2665          * The page may not even be relevant to this search.
2666          */
2667         h = NULL;
2668         ACQUIRE_CUR(dbc, DB_LOCK_WRITE, bt_lpgno, DB_LOCK_NOWAIT, ret);
2669         if (ret != 0) {
2670                 if (ret == DB_LOCK_DEADLOCK ||
2671                     ret == DB_LOCK_NOTGRANTED ||
2672                     ret == DB_PAGE_NOTFOUND)
2673                         ret = 0;
2674                 goto fast_miss;
2675         }
2676
2677         h = cp->page;
2678         inp = P_INP(dbp, h);
2679
2680         /*
2681          * It's okay if the page type isn't right or it's empty, it
2682          * just means that the world changed.
2683          */
2684         if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0)
2685                 goto fast_miss;
2686
2687         /* Verify that this page cannot have moved to another db. */
2688         if (F_ISSET(dbp, DB_AM_SUBDB) &&
2689             LOG_COMPARE(&t->bt_llsn, &LSN(h)) != 0)
2690                 goto fast_miss;
2691
2692         /*
2693          * What we do here is test to see if we're at the beginning or
2694          * end of the tree and if the new item sorts before/after the
2695          * first/last page entry.  We only try to catch inserts into
2696          * the middle of the tree for bulk cursors.
2697          */
2698         if (h->next_pgno == PGNO_INVALID) {
2699                 indx = NUM_ENT(h) - P_INDX;
2700                 if ((ret = __bam_cmp(dbc, key, h, indx,
2701                     t->bt_compare, &cmp)) != 0)
2702                         goto fast_miss;
2703                 if (cmp > 0)
2704                         indx += P_INDX;
2705                 if (cmp >= 0)
2706                         goto fast_hit;
2707         }
2708         if (h->prev_pgno == PGNO_INVALID) {
2709                 indx = 0;
2710                 if ((ret = __bam_cmp(dbc, key, h, indx,
2711                     t->bt_compare, &cmp)) != 0)
2712                         goto fast_miss;
2713                 if (cmp <= 0)
2714                         goto fast_hit;
2715         }
2716         if (bulk) {
2717                 DB_BINARY_SEARCH_FOR(base, lim, NUM_ENT(h), P_INDX) {
2718                         DB_BINARY_SEARCH_INCR(indx, base, lim, P_INDX);
2719                         if ((ret = __bam_cmp(dbc, key, h, indx,
2720                             t->bt_compare, &cmp)) != 0)
2721                                 goto fast_miss;
2722
2723                         if (cmp == 0)
2724                                 goto fast_hit;
2725                         if (cmp > 0)
2726                                 DB_BINARY_SEARCH_SHIFT_BASE(indx, base,
2727                                     lim, P_INDX);
2728                 }
2729                 /*
2730                  * No match found: base is the smallest index greater than
2731                  * the key and may be zero or NUM_ENT(h).
2732                  */
2733                 indx = base;
2734                 if (indx > 0 && indx < NUM_ENT(h)) {
2735                         if (FLD_ISSET(sflags, SR_EXACT))
2736                                 return (DB_NOTFOUND);
2737                         goto fast_hit;
2738                 }
2739         }
2740         goto fast_miss;
2741
2742 fast_hit:
2743         if (cmp == 0) {
2744                 /*
2745                  * Found a duplicate.  Deal with DB_KEYFIRST / DB_KEYLAST.
2746                  */
2747                 if (FLD_ISSET(sflags, SR_DUPFIRST))
2748                         while (indx > 0 && inp[indx - P_INDX] == inp[indx])
2749                                 indx -= P_INDX;
2750                 else if (FLD_ISSET(sflags, SR_DUPLAST))
2751                         while (indx < (db_indx_t)(NUM_ENT(h) - P_INDX) &&
2752                             inp[indx] == inp[indx + P_INDX])
2753                                 indx += P_INDX;
2754         }
2755
2756         /* Set the exact match flag, we may have found a duplicate. */
2757         *exactp = (cmp == 0);
2758
2759         /*
2760          * Insert the entry in the stack.  (Our caller is likely to
2761          * call __bam_stkrel() after our return.)
2762          */
2763         BT_STK_CLR(cp);
2764         BT_STK_ENTER(dbp->env,
2765             cp, h, indx, cp->lock, cp->lock_mode, ret);
2766         if (ret != 0)
2767                 return (ret);
2768         goto done;
2769
2770 fast_miss:
2771         /*
2772          * This was not the right page, so we do not need to retain
2773          * the lock even in the presence of transactions.
2774          *
2775          * This is also an error path, so ret may have been set.
2776          */
2777         DISCARD_CUR(dbc, ret);
2778         cp->pgno = PGNO_INVALID;
2779         if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
2780                 ret = t_ret;
2781         if (ret != 0)
2782                 return (ret);
2783
2784 search:
2785         if ((ret = __bam_search(dbc, root_pgno,
2786             key, sflags, 1, NULL, exactp)) != 0)
2787                 return (ret);
2788
2789 done:   /* Initialize the cursor from the stack. */
2790         cp->page = cp->csp->page;
2791         cp->pgno = cp->csp->page->pgno;
2792         cp->indx = cp->csp->indx;
2793         cp->lock = cp->csp->lock;
2794         cp->lock_mode = cp->csp->lock_mode;
2795
2796         /* If on an empty page or a deleted record, move to the next one. */
2797         if (flags == DB_FIRST &&
2798             (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc)))
2799                 if ((ret = __bamc_next(dbc, 0, 0)) != 0)
2800                         return (ret);
2801         if (flags == DB_LAST &&
2802             (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc)))
2803                 if ((ret = __bamc_prev(dbc)) != 0)
2804                         return (ret);
2805
2806         return (0);
2807 }
2808
2809 /*
2810  * __bamc_physdel --
2811  *      Physically remove an item from the page.
2812  */
2813 static int
2814 __bamc_physdel(dbc)
2815         DBC *dbc;
2816 {
2817         BTREE_CURSOR *cp;
2818         DB *dbp;
2819         DBT key;
2820         DB_LOCK next_lock, prev_lock;
2821         db_pgno_t pgno;
2822         int delete_page, empty_page, exact, ret;
2823
2824         dbp = dbc->dbp;
2825         memset(&key, 0, sizeof(DBT));
2826         cp = (BTREE_CURSOR *)dbc->internal;
2827         delete_page = empty_page = ret = 0;
2828         LOCK_INIT(next_lock);
2829         LOCK_INIT(prev_lock);
2830
2831         /* If the page is going to be emptied, consider deleting it. */
2832         delete_page = empty_page =
2833             NUM_ENT(cp->page) == (TYPE(cp->page) == P_LBTREE ? 2 : 1);
2834
2835         /*
2836          * Check if the application turned off reverse splits.  Applications
2837          * can't turn off reverse splits in off-page duplicate trees, that
2838          * space will never be reused unless the exact same key is specified.
2839          */
2840         if (delete_page &&
2841             !F_ISSET(dbc, DBC_OPD) && F_ISSET(dbp, DB_AM_REVSPLITOFF))
2842                 delete_page = 0;
2843
2844         /*
2845          * We never delete the last leaf page.  (Not really true -- we delete
2846          * the last leaf page of off-page duplicate trees, but that's handled
2847          * by our caller, not down here.)
2848          */
2849         if (delete_page && cp->pgno == cp->root)
2850                 delete_page = 0;
2851
2852         /*
2853          * To delete a leaf page other than an empty root page, we need a
2854          * copy of a key from the page.  Use the 0th page index since it's
2855          * the last key the page held.
2856          *
2857          * !!!
2858          * Note that because __bamc_physdel is always called from a cursor
2859          * close, it should be safe to use the cursor's own "my_rkey" memory
2860          * to temporarily hold this key.  We shouldn't own any returned-data
2861          * memory of interest--if we do, we're in trouble anyway.
2862          */
2863         if (delete_page) {
2864                 if ((ret = __db_ret(dbc, cp->page, 0, &key,
2865                     &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2866                         return (ret);
2867         }
2868
2869         /*
2870          * Delete the items.  If page isn't empty, we adjust the cursors.
2871          *
2872          * !!!
2873          * The following operations to delete a page may deadlock.  The easy
2874          * scenario is if we're deleting an item because we're closing cursors
2875          * because we've already deadlocked and want to call txn->abort.  If
2876          * we fail due to deadlock, we'll leave a locked, possibly empty page
2877          * in the tree, which won't be empty long because we'll undo the delete
2878          * when we undo the transaction's modifications.
2879          *
2880          * !!!
2881          * Delete the key item first, otherwise the on-page duplicate checks
2882          * in __bam_ditem() won't work!
2883          */
2884         if ((ret = __memp_dirty(dbp->mpf,
2885             &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
2886                 return (ret);
2887         if (TYPE(cp->page) == P_LBTREE) {
2888                 if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2889                         return (ret);
2890                 if (!empty_page)
2891                         if ((ret = __bam_ca_di(dbc,
2892                             PGNO(cp->page), cp->indx, -1)) != 0)
2893                                 return (ret);
2894         }
2895         if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2896                 return (ret);
2897
2898         /* Clear the deleted flag, the item is gone. */
2899         F_CLR(cp, C_DELETED);
2900
2901         if (!empty_page)
2902                 if ((ret = __bam_ca_di(dbc, PGNO(cp->page), cp->indx, -1)) != 0)
2903                         return (ret);
2904
2905         /*
2906          * Need to downgrade write locks here or non-txn locks will get stuck.
2907          */
2908         if (F_ISSET(dbc->dbp, DB_AM_READ_UNCOMMITTED)) {
2909                 if ((ret = __TLPUT(dbc, cp->lock)) != 0)
2910                         return (ret);
2911                 cp->lock_mode = DB_LOCK_WWRITE;
2912                 if (cp->page != NULL &&
2913                     (ret = __memp_shared(dbp->mpf, cp->page)) != 0)
2914                         return (ret);
2915         }
2916         /* If we're not going to try and delete the page, we're done. */
2917         if (!delete_page)
2918                 return (0);
2919
2920         /*
2921          * Lock the previous and next pages before latching the parent
2922          * sub tree.
2923          */
2924         if (STD_LOCKING(dbc)) {
2925                 if ((pgno = PREV_PGNO(cp->page)) != PGNO_INVALID &&
2926                     (ret = __db_lget(dbc,
2927                     0, pgno, DB_LOCK_WRITE, 0, &prev_lock)) != 0)
2928                         return (ret);
2929                 if ((pgno = NEXT_PGNO(cp->page)) != PGNO_INVALID &&
2930                     (ret = __db_lget(dbc,
2931                     0, pgno, DB_LOCK_WRITE, 0, &next_lock)) != 0) {
2932                         (void)__TLPUT(dbc, next_lock);
2933                         return (ret);
2934                 }
2935         }
2936         DISCARD_CUR(dbc, ret);
2937         if (ret != 0)
2938                 goto err;
2939         ret = __bam_search(dbc, PGNO_INVALID, &key, SR_DEL, 0, NULL, &exact);
2940
2941         /*
2942          * If everything worked, delete the stack, otherwise, release the
2943          * stack and page locks without further damage.
2944          */
2945         if (ret == 0)
2946                 ret = __bam_dpages(dbc, 1, BTD_RELINK);
2947         else
2948                 (void)__bam_stkrel(dbc, 0);
2949
2950 err:    (void)__TLPUT(dbc, prev_lock);
2951         (void)__TLPUT(dbc, next_lock);
2952         return (ret);
2953 }
2954
2955 /*
2956  * __bamc_getstack --
2957  *      Acquire a full stack for a cursor.
2958  */
2959 static int
2960 __bamc_getstack(dbc)
2961         DBC *dbc;
2962 {
2963         BTREE_CURSOR *cp;
2964         DB *dbp;
2965         DBT dbt;
2966         DB_MPOOLFILE *mpf;
2967         PAGE *h;
2968         int exact, ret, t_ret;
2969
2970         dbp = dbc->dbp;
2971         mpf = dbp->mpf;
2972         cp = (BTREE_CURSOR *)dbc->internal;
2973
2974         /*
2975          * Get the page with the current item on it.  The caller of this
2976          * routine has to already hold a read lock on the page, so there
2977          * is no additional lock to acquire.
2978          */
2979         if ((ret = __memp_fget(mpf, &cp->pgno,
2980              dbc->thread_info, dbc->txn, 0, &h)) != 0)
2981                 return (ret);
2982
2983         /* Get a copy of a key from the page. */
2984         memset(&dbt, 0, sizeof(DBT));
2985         ret = __db_ret(dbc, h, 0, &dbt,
2986              &dbc->my_rkey.data, &dbc->my_rkey.ulen);
2987         if ((t_ret = __memp_fput(mpf,
2988              dbc->thread_info, h, dbc->priority)) != 0 && ret == 0)
2989                 ret = t_ret;
2990         if (ret != 0)
2991                 return (ret);
2992
2993         /* Get a write-locked stack for the page. */
2994         exact = 0;
2995         ret = __bam_search(dbc, PGNO_INVALID,
2996             &dbt, SR_KEYFIRST, 1, NULL, &exact);
2997
2998         return (ret);
2999 }
3000
3001 /*
3002  * __bam_isopd --
3003  *      Return if the cursor references an off-page duplicate tree via its
3004  *      page number.
3005  */
3006 static int
3007 __bam_isopd(dbc, pgnop)
3008         DBC *dbc;
3009         db_pgno_t *pgnop;
3010 {
3011         BOVERFLOW *bo;
3012
3013         if (TYPE(dbc->internal->page) != P_LBTREE)
3014                 return (0);
3015
3016         bo = GET_BOVERFLOW(dbc->dbp,
3017             dbc->internal->page, dbc->internal->indx + O_INDX);
3018         if (B_TYPE(bo->type) == B_DUPLICATE) {
3019                 *pgnop = bo->pgno;
3020                 return (1);
3021         }
3022         return (0);
3023 }
3024
3025 /*
3026  * __bam_opd_exists --
3027  *      Return if the current position has any data.
3028  * PUBLIC: int  __bam_opd_exists __P((DBC *, db_pgno_t));
3029  */
3030 int
3031 __bam_opd_exists(dbc, pgno)
3032         DBC *dbc;
3033         db_pgno_t pgno;
3034 {
3035         PAGE *h;
3036         int ret;
3037
3038         if ((ret = __memp_fget(dbc->dbp->mpf, &pgno,
3039             dbc->thread_info, dbc->txn, 0, &h)) != 0)
3040                 return (ret);
3041
3042         /*
3043          * We always collapse OPD trees so we only need to check
3044          * the number of entries on the root.  If there is a non-empty
3045          * tree then there will be duplicates.
3046          */
3047         if (NUM_ENT(h) == 0)
3048                 ret = 0;
3049         else
3050                 ret = DB_KEYEXIST;
3051
3052         (void)__memp_fput(dbc->dbp->mpf, dbc->thread_info, h, dbc->priority);
3053
3054         return (ret);
3055 }