Remove definition of builtin function
[platform/upstream/db4.git] / btree / bt_compact.c
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1999-2009 Oracle.  All rights reserved.
5  *
6  * $Id$
7  */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/lock.h"
15 #include "dbinc/log.h"
16 #include "dbinc/mp.h"
17 #include "dbinc/txn.h"
18
19 static int __bam_compact_dups __P((DBC *,
20      PAGE **, u_int32_t, int, DB_COMPACT *, int *));
21 static int __bam_compact_int __P((DBC *,
22      DBT *, DBT *, u_int32_t, int *, DB_COMPACT *, int *));
23 static int __bam_compact_isdone __P((DBC *, DBT *, PAGE *, int *));
24 static int __bam_csearch __P((DBC *, DBT *, u_int32_t, int));
25 static int __bam_lock_tree __P((DBC *, EPG *, EPG *csp, u_int32_t, u_int32_t));
26 static int __bam_lock_subtree __P((DBC *, PAGE *, u_int32_t, u_int32_t));
27 static int __bam_merge __P((DBC *,
28      DBC *,  u_int32_t, DBT *, DB_COMPACT *,int *));
29 static int __bam_merge_internal __P((DBC *, DBC *, int, DB_COMPACT *, int *));
30 static int __bam_merge_pages __P((DBC *, DBC *, DB_COMPACT *));
31 static int __bam_merge_records __P((DBC *, DBC*,  u_int32_t, DB_COMPACT *));
32 static int __bam_truncate_internal_overflow __P((DBC *, PAGE *, DB_COMPACT *));
33 static int __bam_truncate_overflow __P((DBC *,
34      db_pgno_t, PAGE **, DB_COMPACT *));
35 static int __bam_truncate_page __P((DBC *, PAGE **, PAGE *, int));
36 static int __bam_truncate_root_page __P((DBC *,
37      PAGE *, u_int32_t, DB_COMPACT *));
38
39 #ifdef HAVE_FTRUNCATE
40 static int __bam_free_freelist __P((DB *, DB_THREAD_INFO *, DB_TXN *));
41 static int __bam_savekey __P((DBC *, int, DBT *));
42 static int __bam_setup_freelist __P((DB *, db_pglist_t *, u_int32_t));
43 static int __bam_truncate_internal __P((DB *,
44      DB_THREAD_INFO *, DB_TXN *, DB_COMPACT *));
45 #endif
46
47 #define SAVE_START                                                      \
48         do {                                                            \
49                 save_data = *c_data;                                    \
50                 ret = __db_retcopy(env,                         \
51                      &save_start, current.data, current.size,           \
52                      &save_start.data, &save_start.ulen);               \
53         } while (0)
54
55 /*
56  * Only restore those things that are negated by aborting the
57  * transaction.  We don't restore the number of deadlocks, for example.
58  */
59
60 #define RESTORE_START                                                   \
61         do {                                                            \
62                 c_data->compact_pages_free =                            \
63                       save_data.compact_pages_free;                     \
64                 c_data->compact_levels = save_data.compact_levels;      \
65                 c_data->compact_truncate = save_data.compact_truncate;  \
66                 ret = __db_retcopy(env, &current,                       \
67                      save_start.data, save_start.size,                  \
68                      &current.data, &current.ulen);                     \
69         } while (0)
70
71 /*
72  e __bam_compact -- compact a btree.
73  *
74  * PUBLIC: int __bam_compact __P((DB *, DB_THREAD_INFO *, DB_TXN *,
75  * PUBLIC:     DBT *, DBT *, DB_COMPACT *, u_int32_t, DBT *));
76  */
77 int
78 __bam_compact(dbp, ip, txn, start, stop, c_data, flags, end)
79         DB *dbp;
80         DB_THREAD_INFO *ip;
81         DB_TXN *txn;
82         DBT *start, *stop;
83         DB_COMPACT *c_data;
84         u_int32_t flags;
85         DBT *end;
86 {
87         DBC *dbc;
88         DBT current, save_start;
89         DB_COMPACT save_data;
90         ENV *env;
91         u_int32_t factor, retry;
92         int deadlock, have_freelist, isdone, ret, span, t_ret, txn_local;
93
94 #ifdef HAVE_FTRUNCATE
95         db_pglist_t *list;
96         db_pgno_t last_pgno;
97         u_int32_t nelems, truncated;
98 #endif
99
100         env = dbp->env;
101
102         memset(&current, 0, sizeof(current));
103         memset(&save_start, 0, sizeof(save_start));
104         dbc = NULL;
105         factor = 0;
106         have_freelist = deadlock = isdone = ret = span = 0;
107         ret = retry = 0;
108
109 #ifdef HAVE_FTRUNCATE
110         list = NULL;
111         last_pgno = 0;
112         nelems = truncated = 0;
113 #endif
114
115         /*
116          * We pass "current" to the internal routine, indicating where that
117          * routine should begin its work and expecting that it will return to
118          * us the last key that it processed.
119          */
120         if (start != NULL && (ret = __db_retcopy(env,
121              &current, start->data, start->size,
122              &current.data, &current.ulen)) != 0)
123                 return (ret);
124
125         if (IS_DB_AUTO_COMMIT(dbp, txn))
126                 txn_local = 1;
127         else
128                 txn_local = 0;
129         if (!LF_ISSET(DB_FREE_SPACE | DB_FREELIST_ONLY))
130                 goto no_free;
131         if (LF_ISSET(DB_FREELIST_ONLY))
132                 LF_SET(DB_FREE_SPACE);
133
134 #ifdef HAVE_FTRUNCATE
135         /* Sort the freelist and set up the in-memory list representation. */
136         if (txn_local && (ret = __txn_begin(env, ip, NULL, &txn, 0)) != 0)
137                 goto err;
138
139         if ((ret = __db_free_truncate(dbp, ip,
140              txn, flags, c_data, &list, &nelems, &last_pgno)) != 0) {
141                 LF_CLR(DB_FREE_SPACE);
142                 goto terr;
143         }
144
145         /* If the freelist is empty and we are not filling, get out. */
146         if (nelems == 0 && LF_ISSET(DB_FREELIST_ONLY)) {
147                 ret = 0;
148                 LF_CLR(DB_FREE_SPACE);
149                 goto terr;
150         }
151         if ((ret = __bam_setup_freelist(dbp, list, nelems)) != 0) {
152                 /* Someone else owns the free list. */
153                 if (ret == EBUSY)
154                         ret = 0;
155         }
156         if (ret == 0)
157                 have_freelist = 1;
158
159         /* Commit the txn and release the meta page lock. */
160 terr:   if (txn_local) {
161                 if ((t_ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0 && ret == 0)
162                         ret = t_ret;
163                 txn = NULL;
164         }
165         if (ret != 0)
166                 goto err;
167
168         /* Save the number truncated so far, we will add what we get below. */
169         truncated = c_data->compact_pages_truncated;
170         if (LF_ISSET(DB_FREELIST_ONLY))
171                 goto done;
172 #endif
173
174         /*
175          * We want factor to be the target number of free bytes on each page,
176          * so we know when to stop adding items to a page.   Make sure to
177          * subtract the page overhead when computing this target.  This can
178          * result in a 1-2% error on the smallest page.
179          * First figure out how many bytes we should use:
180          */
181 no_free:
182         factor = dbp->pgsize - SIZEOF_PAGE;
183         if (c_data->compact_fillpercent != 0) {
184                 factor *= c_data->compact_fillpercent;
185                 factor /= 100;
186         }
187         /* Now convert to the number of free bytes to target. */
188         factor = (dbp->pgsize - SIZEOF_PAGE) - factor;
189
190         if (c_data->compact_pages == 0)
191                 c_data->compact_pages = DB_MAX_PAGES;
192
193         do {
194                 deadlock = 0;
195
196                 SAVE_START;
197                 if (ret != 0)
198                         break;
199
200                 if (txn_local) {
201                         if ((ret = __txn_begin(env, ip, NULL, &txn, 0)) != 0)
202                                 break;
203
204                         if (c_data->compact_timeout != 0 &&
205                             (ret = __txn_set_timeout(txn,
206                             c_data->compact_timeout, DB_SET_LOCK_TIMEOUT)) != 0)
207                                 goto err;
208                 }
209
210                 if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
211                         goto err;
212
213                 if ((ret = __bam_compact_int(dbc, &current, stop, factor,
214                      &span, c_data, &isdone)) ==
215                      DB_LOCK_DEADLOCK && txn_local) {
216                         /*
217                          * We retry on deadlock.  Cancel the statistics
218                          * and reset the start point to before this
219                          * iteration.
220                          */
221                         deadlock = 1;
222                         c_data->compact_deadlock++;
223                         RESTORE_START;
224                 }
225                 /*
226                  * If we could not get a lock while holding an internal
227                  * node latched, commit the current local transaction otherwise
228                  * report a deadlock.
229                  */
230                 if (ret == DB_LOCK_NOTGRANTED) {
231                         if (txn_local || retry++ < 5)
232                                 ret = 0;
233                         else
234                                 ret = DB_LOCK_DEADLOCK;
235                 } else
236                         retry = 0;
237
238                 if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
239                         ret = t_ret;
240
241 err:            if (txn_local && txn != NULL) {
242                         if (ret == 0 && deadlock == 0)
243                                 ret = __txn_commit(txn, DB_TXN_NOSYNC);
244                         else if ((t_ret = __txn_abort(txn)) != 0 && ret == 0)
245                                 ret = t_ret;
246                         txn = NULL;
247                 }
248         } while (ret == 0 && !isdone);
249
250         if (ret == 0 && end != NULL)
251                 ret = __db_retcopy(env, end, current.data, current.size,
252                     &end->data, &end->ulen);
253         if (current.data != NULL)
254                 __os_free(env, current.data);
255         if (save_start.data != NULL)
256                 __os_free(env, save_start.data);
257
258 #ifdef HAVE_FTRUNCATE
259         /*
260          * Finish up truncation work.  If there are pages left in the free
261          * list then search the internal nodes of the tree as we may have
262          * missed some while walking the leaf nodes.  Then calculate how
263          * many pages we have truncated and release the in-memory free list.
264          */
265 done:   if (LF_ISSET(DB_FREE_SPACE)) {
266                 DBMETA *meta;
267                 db_pgno_t pgno;
268
269                 pgno = PGNO_BASE_MD;
270                 isdone = 1;
271                 if (ret == 0 && !LF_ISSET(DB_FREELIST_ONLY) && (t_ret =
272                     __memp_fget(dbp->mpf, &pgno, ip, txn, 0, &meta)) == 0) {
273                         isdone = meta->free == PGNO_INVALID;
274                         ret = __memp_fput(dbp->mpf, ip, meta, dbp->priority);
275                 }
276
277                 if (!isdone)
278                         ret = __bam_truncate_internal(dbp, ip, txn, c_data);
279
280                 /* Clean up the free list. */
281                 if (list != NULL)
282                         __os_free(env, list);
283
284                 if ((t_ret =
285                     __memp_fget(dbp->mpf, &pgno, ip, txn, 0, &meta)) == 0) {
286                         c_data->compact_pages_truncated =
287                             truncated + last_pgno - meta->last_pgno;
288                         if ((t_ret = __memp_fput(dbp->mpf, ip,
289                             meta, dbp->priority)) != 0 && ret == 0)
290                                 ret = t_ret;
291                 } else if (ret == 0)
292                         ret = t_ret;
293
294                 if (have_freelist && (t_ret =
295                     __bam_free_freelist(dbp, ip, txn)) != 0 && ret == 0)
296                         t_ret = ret;
297         }
298 #endif
299
300         return (ret);
301 }
302
303 /*
304  * __bam_csearch -- isolate search code for bam_compact.
305  * This routine hides the differences between searching
306  * a BTREE and a RECNO from the rest of the code.
307  */
308 #define CS_READ 0       /* We are just reading. */
309 #define CS_PARENT       1       /* We want the parent too, write lock. */
310 #define CS_NEXT         2       /* Get the next page. */
311 #define CS_NEXT_WRITE   3       /* Get the next page and write lock. */
312 #define CS_DEL          4       /* Get a stack to delete a page. */
313 #define CS_START        5       /* Starting level for stack, write lock. */
314 #define CS_NEXT_BOTH    6       /* Get this page and the next, write lock. */
315 #define CS_GETRECNO     0x80    /* Extract record number from start. */
316
317 static int
318 __bam_csearch(dbc, start, sflag, level)
319         DBC *dbc;
320         DBT *start;
321         u_int32_t sflag;
322         int level;
323 {
324         BTREE_CURSOR *cp;
325         int not_used, ret;
326
327         cp = (BTREE_CURSOR *)dbc->internal;
328
329         if (dbc->dbtype == DB_RECNO) {
330                 /* If GETRECNO is not set the cp->recno is what we want. */
331                 if (FLD_ISSET(sflag, CS_GETRECNO)) {
332                         if (start == NULL || start->size == 0)
333                                 cp->recno = 1;
334                         else if ((ret =
335                              __ram_getno(dbc, start, &cp->recno, 0)) != 0)
336                                 return (ret);
337                         FLD_CLR(sflag, CS_GETRECNO);
338                 }
339                 switch (sflag) {
340                 case CS_READ:
341                         sflag = SR_READ;
342                         break;
343                 case CS_NEXT:
344                         sflag = SR_PARENT | SR_READ;
345                         break;
346                 case CS_START:
347                         level = LEAFLEVEL;
348                         /* FALLTHROUGH */
349                 case CS_DEL:
350                 case CS_NEXT_WRITE:
351                         sflag = SR_STACK;
352                         break;
353                 case CS_NEXT_BOTH:
354                         sflag = SR_BOTH | SR_NEXT | SR_WRITE;
355                         break;
356                 case CS_PARENT:
357                         sflag = SR_PARENT | SR_WRITE;
358                         break;
359                 default:
360                         return (__env_panic(dbc->env, EINVAL));
361                 }
362                 if ((ret = __bam_rsearch(dbc,
363                      &cp->recno, sflag, level, &not_used)) != 0)
364                         return (ret);
365                 /* Reset the cursor's recno to the beginning of the page. */
366                 cp->recno -= cp->csp->indx;
367         } else {
368                 FLD_CLR(sflag, CS_GETRECNO);
369                 switch (sflag) {
370                 case CS_READ:
371                         sflag = SR_READ | SR_DUPFIRST;
372                         break;
373                 case CS_DEL:
374                         sflag = SR_DEL;
375                         break;
376                 case CS_NEXT:
377                         sflag = SR_NEXT;
378                         break;
379                 case CS_NEXT_WRITE:
380                         sflag = SR_NEXT | SR_WRITE;
381                         break;
382                 case CS_NEXT_BOTH:
383                         sflag = SR_BOTH | SR_NEXT | SR_WRITE;
384                         break;
385                 case CS_START:
386                         sflag = SR_START | SR_WRITE;
387                         break;
388                 case CS_PARENT:
389                         sflag = SR_PARENT | SR_WRITE;
390                         break;
391                 default:
392                         return (__env_panic(dbc->env, EINVAL));
393                 }
394                 if (start == NULL || start->size == 0)
395                         FLD_SET(sflag, SR_MIN);
396
397                 if ((ret = __bam_search(dbc,
398                      cp->root, start, sflag, level, NULL, &not_used)) != 0)
399                         return (ret);
400         }
401
402         return (0);
403 }
404
405 /*
406  * __bam_compact_int -- internal compaction routine.
407  *      Called either with a cursor on the main database
408  * or a cursor initialized to the root of an off page duplicate
409  * tree.
410  */
411 static int
412 __bam_compact_int(dbc, start, stop, factor, spanp, c_data, donep)
413         DBC *dbc;
414         DBT *start, *stop;
415         u_int32_t factor;
416         int *spanp;
417         DB_COMPACT *c_data;
418         int *donep;
419 {
420         BTREE_CURSOR *cp, *ncp;
421         DB *dbp;
422         DBC *ndbc;
423         DB_LOCK metalock, next_lock, nnext_lock, prev_lock, saved_lock;
424         DB_MPOOLFILE *dbmp;
425         ENV *env;
426         EPG *epg;
427         PAGE *pg, *ppg, *npg;
428         db_pgno_t metapgno, npgno, nnext_pgno;
429         db_pgno_t pgno, prev_pgno, ppgno, saved_pgno;
430         db_recno_t next_recno;
431         u_int32_t sflag, pgs_free;
432         int check_dups, check_trunc, clear_root, isdone;
433         int merged, nentry, next_p, pgs_done, ret, t_ret, tdone;
434
435 #ifdef  DEBUG
436 #define CTRACE(dbc, location, t, start, f) do {                         \
437                 DBT __trace;                                            \
438                 DB_SET_DBT(__trace, t, strlen(t));                      \
439                 DEBUG_LWRITE(                                           \
440                     dbc, (dbc)->txn, location, &__trace, start, f)      \
441         } while (0)
442 #define PTRACE(dbc, location, p, start, f) do {                         \
443                 char __buf[32];                                         \
444                 (void)snprintf(__buf,                                   \
445                     sizeof(__buf), "pgno: %lu", (u_long)p);             \
446                 CTRACE(dbc, location, __buf, start, f);                 \
447         } while (0)
448 #else
449 #define CTRACE(dbc, location, t, start, f)
450 #define PTRACE(dbc, location, p, start, f)
451 #endif
452
453         ndbc = NULL;
454         pg = NULL;
455         npg = NULL;
456
457         isdone = 0;
458         tdone = 0;
459         pgs_done = 0;
460         next_recno = 0;
461         next_p = 0;
462         clear_root = 0;
463         metapgno = PGNO_BASE_MD;
464         LOCK_INIT(next_lock);
465         LOCK_INIT(nnext_lock);
466         LOCK_INIT(saved_lock);
467         LOCK_INIT(metalock);
468         LOCK_INIT(prev_lock);
469         check_trunc = c_data->compact_truncate != PGNO_INVALID;
470         check_dups = (!F_ISSET(dbc, DBC_OPD) &&
471              F_ISSET(dbc->dbp, DB_AM_DUP)) || check_trunc;
472
473         dbp = dbc->dbp;
474         env = dbp->env;
475         dbmp = dbp->mpf;
476         cp = (BTREE_CURSOR *)dbc->internal;
477         pgs_free = c_data->compact_pages_free;
478
479         /* Search down the tree for the starting point. */
480         if ((ret = __bam_csearch(dbc,
481             start, CS_READ | CS_GETRECNO, LEAFLEVEL)) != 0) {
482                 /* Its not an error to compact an empty db. */
483                 if (ret == DB_NOTFOUND)
484                         ret = 0;
485                 isdone = 1;
486                 goto err;
487         }
488
489         /*
490          * Get the first leaf page. The loop below will change pg so
491          * we clear the stack reference so we don't put a a page twice.
492          */
493         pg = cp->csp->page;
494         cp->csp->page = NULL;
495         next_recno = cp->recno;
496 next:   /*
497          * This is the start of the main compaction loop.  There are 3
498          * parts to the process:
499          * 1) Walk the leaf pages of the tree looking for a page to
500          *      process.  We do this with read locks.  Save the
501          *      key from the page and release it.
502          * 2) Set up a cursor stack which will write lock the page
503          *      and enough of its ancestors to get the job done.
504          *      This could go to the root if we might delete a subtree
505          *      or we have record numbers to update.
506          * 3) Loop fetching pages after the above page and move enough
507          *      data to fill it.
508          * We exit the loop if we are at the end of the leaf pages, are
509          * about to lock a new subtree (we span) or on error.
510          */
511
512         /* Walk the pages looking for something to fill up. */
513         while ((npgno = NEXT_PGNO(pg)) != PGNO_INVALID) {
514                 c_data->compact_pages_examine++;
515                 PTRACE(dbc, "Next", PGNO(pg), start, 0);
516
517                 /* If we have fetched the next page, get the new key. */
518                 if (next_p == 1 &&
519                     dbc->dbtype != DB_RECNO && NUM_ENT(pg) != 0) {
520                         if ((ret = __db_ret(dbc, pg, 0, start,
521                             &start->data, &start->ulen)) != 0)
522                                 goto err;
523                 }
524                 next_recno += NUM_ENT(pg);
525                 if (P_FREESPACE(dbp, pg) > factor ||
526                      (check_trunc && PGNO(pg) > c_data->compact_truncate))
527                         break;
528                 if (stop != NULL && stop->size > 0) {
529                         if ((ret = __bam_compact_isdone(dbc,
530                             stop, pg, &isdone)) != 0)
531                                 goto err;
532                         if (isdone)
533                                 goto done;
534                 }
535
536                 /*
537                  * The page does not need more data or to be swapped,
538                  * check to see if we want to look at possible duplicate
539                  * trees or overflow records and the move on to the next page.
540                  */
541                 cp->recno += NUM_ENT(pg);
542                 next_p = 1;
543                 tdone = pgs_done;
544                 PTRACE(dbc, "Dups", PGNO(pg), start, 0);
545                 if (check_dups && (ret = __bam_compact_dups(
546                      dbc, &pg, factor, 0, c_data, &pgs_done)) != 0)
547                         goto err;
548                 npgno = NEXT_PGNO(pg);
549                 if ((ret = __memp_fput(dbmp,
550                      dbc->thread_info, pg, dbc->priority)) != 0)
551                         goto err;
552                 pg = NULL;
553                 /*
554                  * If we don't do anything we don't need to hold
555                  * the lock on the previous page, so couple always.
556                  */
557                 if ((ret = __db_lget(dbc,
558                     tdone == pgs_done ? LCK_COUPLE_ALWAYS : LCK_COUPLE,
559                     npgno, DB_LOCK_READ, 0, &cp->csp->lock)) != 0)
560                         goto err;
561                 if ((ret = __memp_fget(dbmp, &npgno,
562                      dbc->thread_info, dbc->txn, 0, &pg)) != 0)
563                         goto err;
564         }
565
566         /*
567          * When we get here we have 3 cases:
568          * 1) We've reached the end of the leaf linked list and are done.
569          * 2) A page whose freespace exceeds our target and therefore needs
570          *      to have data added to it.
571          * 3) A page that doesn't have too much free space but needs to be
572          *      checked for truncation.
573          * In both cases 2 and 3, we need that page's first key or record
574          * number.  We may already have it, if not get it here.
575          */
576         if ((nentry = NUM_ENT(pg)) != 0) {
577                 next_p = 0;
578                 /* Get a copy of the first recno on the page. */
579                 if (dbc->dbtype == DB_RECNO) {
580                         if ((ret = __db_retcopy(dbp->env, start,
581                              &cp->recno, sizeof(cp->recno),
582                              &start->data, &start->ulen)) != 0)
583                                 goto err;
584                 } else if (start->size == 0 && (ret = __db_ret(dbc,
585                     pg, 0, start, &start->data, &start->ulen)) != 0)
586                         goto err;
587
588                 if (npgno == PGNO_INVALID) {
589                         /* End of the tree, check its duplicates and exit. */
590                         PTRACE(dbc, "GoDone", PGNO(pg), start, 0);
591                         if (check_dups && (ret = __bam_compact_dups(dbc,
592                            &pg, factor, 0, c_data, &pgs_done)) != 0)
593                                 goto err;
594                         c_data->compact_pages_examine++;
595                         isdone = 1;
596                         goto done;
597                 }
598         }
599
600         /* Release the page so we don't deadlock getting its parent. */
601         if ((ret = __memp_fput(dbmp, dbc->thread_info, pg, dbc->priority)) != 0)
602                 goto err;
603         if ((ret = __LPUT(dbc, cp->csp->lock)) != 0)
604                 goto err;
605         BT_STK_CLR(cp);
606         pg = NULL;
607         saved_pgno = PGNO_INVALID;
608         prev_pgno = PGNO_INVALID;
609         nnext_pgno = PGNO_INVALID;
610
611         /*
612          * We must lock the metadata page first because we cannot block
613          * while holding interior nodes of the tree pinned.
614          */
615
616         if (!LOCK_ISSET(metalock) && pgs_free == c_data->compact_pages_free &&
617             (ret = __db_lget(dbc,
618             LCK_ALWAYS, metapgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
619                 goto err;
620
621         /*
622          * Setup the cursor stack. There are 3 cases:
623          * 1) the page is empty and will be deleted: nentry == 0.
624          * 2) the next page has the same parent: *spanp == 0.
625          * 3) the next page has a different parent: *spanp == 1.
626          *
627          * We now need to search the tree again, getting a write lock
628          * on the page we are going to merge or delete.  We do this by
629          * searching down the tree and locking as much of the subtree
630          * above the page as needed.  In the case of a delete we will
631          * find the maximal subtree that can be deleted. In the case
632          * of merge if the current page and the next page are siblings
633          * with the same parent then we only need to lock the parent.
634          * Otherwise *span will be set and we need to search to find the
635          * lowest common ancestor.  Dbc will be set to contain the subtree
636          * containing the page to be merged or deleted. Ndbc will contain
637          * the minimal subtree containing that page and its next sibling.
638          * In all cases for DB_RECNO we simplify things and get the whole
639          * tree if we need more than a single parent.
640          * The tree can collapse while we don't have it locked, so the
641          * page we are looking for may be gone.  If so we are at
642          * the right most end of the leaf pages and are done.
643          */
644
645 retry:  pg = NULL;
646         if (npg != NULL && (ret = __memp_fput(dbmp,
647              dbc->thread_info, npg, dbc->priority)) != 0)
648                 goto err;
649         npg = NULL;
650         if (ndbc != NULL) {
651                 ncp = (BTREE_CURSOR *)ndbc->internal;
652                 if (clear_root == 1) {
653                         ncp->sp->page = NULL;
654                         LOCK_INIT(ncp->sp->lock);
655                 }
656                 if ((ret = __bam_stkrel(ndbc, 0)) != 0)
657                         goto err;
658         }
659         clear_root = 0;
660         /* Case 1 -- page is empty. */
661         if (nentry == 0) {
662                 CTRACE(dbc, "Empty", "", start, 0);
663                 if (next_p == 1)
664                         sflag = CS_NEXT_WRITE;
665                 else
666                         sflag = CS_DEL;
667                 if ((ret = __bam_csearch(dbc, start, sflag, LEAFLEVEL)) != 0) {
668                         isdone = 1;
669                         if (ret == DB_NOTFOUND)
670                                 ret = 0;
671                         goto err;
672                 }
673
674                 pg = cp->csp->page;
675                 /* Check to see if the page is still empty. */
676                 if (NUM_ENT(pg) != 0)
677                         npgno = PGNO(pg);
678                 else {
679                         npgno = NEXT_PGNO(pg);
680                         /* If this is now the root, we are very done. */
681                         if (PGNO(pg) == cp->root)
682                                 isdone = 1;
683                         else {
684                                 if (npgno != PGNO_INVALID) {
685                                         TRY_LOCK(dbc, npgno, saved_pgno,
686                                             next_lock, DB_LOCK_WRITE, retry);
687                                         if (ret != 0)
688                                                 goto err;
689                                 }
690                                 if (PREV_PGNO(pg) != PGNO_INVALID) {
691                                         TRY_LOCK(dbc, PREV_PGNO(pg), prev_pgno,
692                                             prev_lock, DB_LOCK_WRITE, retry);
693                                         if (ret != 0)
694                                                 goto err;
695                                 }
696                                 if ((ret =
697                                     __bam_dpages(dbc, 0, BTD_RELINK)) != 0)
698                                         goto err;
699                                 c_data->compact_pages_free++;
700                                 if ((ret = __TLPUT(dbc, prev_lock)) != 0)
701                                         goto err;
702                                 LOCK_INIT(prev_lock);
703                                 if ((ret = __TLPUT(dbc, next_lock)) != 0)
704                                         goto err;
705                                 LOCK_INIT(next_lock);
706                                 goto next_no_release;
707                         }
708                 }
709                 goto next_page;
710         }
711
712         /* case 3 -- different parents. */
713         if (*spanp) {
714                 CTRACE(dbc, "Span", "", start, 0);
715                 /*
716                  * Search the tree looking for the page containing and
717                  * the next page after the current key.
718                  * The stack will be rooted at the page that spans
719                  * the current and next pages. The two subtrees
720                  * are returned below that.  For BTREE the current
721                  * page subtreee will be first while for RECNO the
722                  * next page subtree will be first
723                  */
724                 if (ndbc == NULL && (ret = __dbc_dup(dbc, &ndbc, 0)) != 0)
725                         goto err;
726                 ncp = (BTREE_CURSOR *)ndbc->internal;
727
728                 ncp->recno = cp->recno;
729                 cp->recno = next_recno;
730
731                 if ((ret = __bam_csearch(dbc, start, CS_NEXT_BOTH, 0)) != 0) {
732                         if (ret == DB_NOTFOUND) {
733                                 isdone = 1;
734                                 ret = 0;
735                         }
736                         goto err;
737                 }
738
739                 /*
740                  * Find the top of the stack for the second subtree.
741                  */
742                 for (epg = cp->csp - 1; epg > cp->sp; epg--)
743                         if (LEVEL(epg->page) == LEAFLEVEL)
744                                 break;
745                 DB_ASSERT(env, epg != cp->sp);
746
747                 /*
748                  * Copy the root. We will have two instances of the
749                  * same page, be careful not to free both.
750                  */
751                 BT_STK_PUSH(env, ncp, cp->sp->page, cp->sp->indx,
752                      cp->sp->lock, cp->sp->lock_mode, ret);
753                 if (ret != 0)
754                         goto err;
755                 clear_root = 1;
756
757                 /* Copy the stack containing the next page. */
758                 for (epg++; epg <= cp->csp; epg++) {
759                         BT_STK_PUSH(env, ncp, epg->page, epg->indx,
760                              epg->lock, epg->lock_mode, ret);
761                         if (ret != 0)
762                                 goto err;
763                 }
764                 /* adjust the stack pointer to remove these items. */
765                 ncp->csp--;
766                 cp->csp -= ncp->csp - ncp->sp;
767
768                 /*
769                  * If this is RECNO then we want to swap the stacks.
770                  */
771                 if (dbp->type == DB_RECNO) {
772                         ndbc->internal = (DBC_INTERNAL *)cp;
773                         dbc->internal = (DBC_INTERNAL *)ncp;
774                         cp = ncp;
775                         ncp = (BTREE_CURSOR *)ndbc->internal;
776                         cp->sp->indx--;
777                 } else
778                         ncp->sp->indx++;
779
780                 DB_ASSERT(env,
781                     NEXT_PGNO(cp->csp->page) == PGNO(ncp->csp->page));
782                 pg = cp->csp->page;
783
784                 /*
785                  * The page may have emptied while we waited for the
786                  * lock or the record we are looking for may have
787                  * moved.
788                  * Reset npgno so we re-get this page when we go back
789                  * to the top.
790                  */
791                 if (NUM_ENT(pg) == 0 ||
792                      (dbc->dbtype == DB_RECNO &&
793                      NEXT_PGNO(cp->csp->page) != PGNO(ncp->csp->page))) {
794                         npgno = PGNO(pg);
795                         *spanp = 0;
796                         goto next_page;
797                 }
798
799                 if (check_trunc && PGNO(pg) > c_data->compact_truncate) {
800                         if (PREV_PGNO(pg) != PGNO_INVALID) {
801                                 TRY_LOCK2(dbc, ndbc, PREV_PGNO(pg), prev_pgno,
802                                     prev_lock, DB_LOCK_WRITE, retry);
803                                 if (ret != 0)
804                                         goto err1;
805                         }
806                         pgs_done++;
807                         /* Get a fresh low numbered page. */
808                         if ((ret = __bam_truncate_page(dbc,
809                             &pg, ncp->csp->page, 1)) != 0)
810                                 goto err1;
811                         if ((ret = __TLPUT(dbc, prev_lock)) != 0)
812                                 goto err;
813                         LOCK_INIT(prev_lock);
814                 }
815                 *spanp = 0;
816                 PTRACE(dbc, "SDups", PGNO(ncp->csp->page), start, 0);
817                 if (check_dups && (ret = __bam_compact_dups(ndbc,
818                      &ncp->csp->page, factor, 1, c_data, &pgs_done)) != 0)
819                         goto err1;
820
821                 /* Check to see if the tree collapsed. */
822                 if (PGNO(ncp->csp->page) == ncp->root)
823                         goto done;
824
825                 pg = cp->csp->page;
826                 npgno = NEXT_PGNO(pg);
827                 PTRACE(dbc, "SDups", PGNO(pg), start, 0);
828                 if (check_dups && (ret =
829                      __bam_compact_dups(dbc, &cp->csp->page,
830                      factor, 1, c_data, &pgs_done)) != 0)
831                         goto err1;
832
833                 /*
834                  * We may have dropped our locks, check again
835                  * to see if we still need to fill this page and
836                  * we are in a spanning situation.
837                  */
838
839                 if (P_FREESPACE(dbp, pg) <= factor ||
840                      cp->csp[-1].indx != NUM_ENT(cp->csp[-1].page) - 1)
841                         goto next_page;
842
843                 /*
844                  * Try to move things into a single parent.
845                  */
846                 merged = 0;
847                 for (epg = cp->sp; epg != cp->csp; epg++) {
848                         PTRACE(dbc, "PMerge", PGNO(epg->page), start, 0);
849                         if ((ret = __bam_merge_internal(dbc,
850                             ndbc, LEVEL(epg->page), c_data, &merged)) != 0)
851                                 break;
852                         if (merged)
853                                 break;
854                 }
855
856                 if (ret != 0 && ret != DB_LOCK_NOTGRANTED)
857                         goto err1;
858                 /*
859                  * If we merged the parent, then we no longer span.
860                  * Otherwise if we tried to merge the parent but would
861                  * block on one of the other leaf pages try again.
862                  * If we did not merge any records of the parent,
863                  * exit to commit any local transactions and try again.
864                  */
865                 if (merged || ret == DB_LOCK_NOTGRANTED) {
866                         if (merged)
867                                 pgs_done++;
868                         else
869                                 goto done;
870                         if (cp->csp->page == NULL)
871                                 goto deleted;
872                         npgno = PGNO(pg);
873                         next_recno = cp->recno;
874                         goto next_page;
875                 }
876                 PTRACE(dbc, "SMerge", PGNO(cp->csp->page), start, 0);
877
878                 /* if we remove the next page, then we need its next locked */
879                 npgno = NEXT_PGNO(ncp->csp->page);
880                 if (npgno != PGNO_INVALID) {
881                         TRY_LOCK2(dbc, ndbc, npgno,
882                             nnext_pgno, nnext_lock, DB_LOCK_WRITE, retry);
883                         if (ret != 0)
884                                 goto err1;
885                 }
886                 if ((ret = __bam_merge(dbc,
887                      ndbc, factor, stop, c_data, &isdone)) != 0)
888                         goto err1;
889                 pgs_done++;
890                 /*
891                  * __bam_merge could have freed our stack if it
892                  * deleted a page possibly collapsing the tree.
893                  */
894                 if (cp->csp->page == NULL)
895                         goto deleted;
896                 cp->recno += NUM_ENT(pg);
897
898                 if ((ret = __TLPUT(dbc, nnext_lock)) != 0)
899                         goto err1;
900                 LOCK_INIT(nnext_lock);
901
902                 /* If we did not bump to the next page something did not fit. */
903                 if (npgno != NEXT_PGNO(pg)) {
904                         npgno = NEXT_PGNO(pg);
905                         goto next_page;
906                 }
907         } else {
908                 /* Case 2 -- same parents. */
909                 CTRACE(dbc, "Sib", "", start, 0);
910                 if ((ret =
911                     __bam_csearch(dbc, start, CS_PARENT, LEAFLEVEL)) != 0) {
912                         if (ret == DB_NOTFOUND) {
913                                 isdone = 1;
914                                 ret = 0;
915                         }
916                         goto err;
917                 }
918
919                 pg = cp->csp->page;
920                 DB_ASSERT(env, IS_DIRTY(pg));
921                 DB_ASSERT(env,
922                     PGNO(pg) == cp->root || IS_DIRTY(cp->csp[-1].page));
923
924                 /* We now have a write lock, recheck the page. */
925                 if ((nentry = NUM_ENT(pg)) == 0) {
926                         npgno = PGNO(pg);
927                         goto next_page;
928                 }
929
930                 /* Check duplicate trees, we have a write lock on the page. */
931                 PTRACE(dbc, "SibDup", PGNO(pg), start, 0);
932                 if (check_dups && (ret =
933                      __bam_compact_dups(dbc, &cp->csp->page,
934                      factor, 1, c_data, &pgs_done)) != 0)
935                         goto err1;
936                 pg = cp->csp->page;
937                 npgno = NEXT_PGNO(pg);
938
939                 /* Check to see if the tree collapsed. */
940                 if (PGNO(pg) == cp->root)
941                         goto err1;
942                 DB_ASSERT(env, cp->csp - cp->sp == 1);
943
944                 /* After re-locking check to see if we still need to fill. */
945                 if (P_FREESPACE(dbp, pg) <= factor) {
946                         if (check_trunc &&
947                             PGNO(pg) > c_data->compact_truncate) {
948                                 if (PREV_PGNO(pg) != PGNO_INVALID) {
949                                         TRY_LOCK(dbc, PREV_PGNO(pg), prev_pgno,
950                                             prev_lock, DB_LOCK_WRITE, retry);
951                                         if (ret != 0)
952                                                 goto err1;
953                                 }
954                                 if (npgno != PGNO_INVALID) {
955                                         TRY_LOCK(dbc, npgno, saved_pgno,
956                                             next_lock, DB_LOCK_WRITE, retry);
957                                         if (ret != 0)
958                                                 goto err1;
959                                 }
960                                 pgs_done++;
961                                 /* Get a fresh low numbered page. */
962                                 if ((ret = __bam_truncate_page(dbc,
963                                     &pg, NULL, 1)) != 0)
964                                         goto err1;
965                                 if ((ret = __TLPUT(dbc, prev_lock)) != 0)
966                                         goto err1;
967                                 LOCK_INIT(prev_lock);
968                                 if ((ret = __TLPUT(dbc, next_lock)) != 0)
969                                         goto err1;
970                                 LOCK_INIT(next_lock);
971                         }
972                         goto next_page;
973                 }
974
975                 /* If they have the same parent, just dup the cursor */
976                 if (ndbc != NULL && (ret = __dbc_close(ndbc)) != 0)
977                         goto err1;
978                 if ((ret = __dbc_dup(dbc, &ndbc, DB_POSITION)) != 0)
979                         goto err1;
980                 ncp = (BTREE_CURSOR *)ndbc->internal;
981
982                 /*
983                  * ncp->recno needs to have the recno of the next page.
984                  * Bump it by the number of records on the current page.
985                  */
986                 ncp->recno += NUM_ENT(pg);
987         }
988
989         pgno = PGNO(cp->csp->page);
990         ppgno = PGNO(cp->csp[-1].page);
991         /* Fetch pages until we fill this one. */
992         while (!isdone && npgno != PGNO_INVALID &&
993              P_FREESPACE(dbp, pg) > factor && c_data->compact_pages != 0) {
994                 /*
995                  * merging may have to free the parent page, if it does,
996                  * refetch it but do it decending the tree.
997                  */
998                 epg = &cp->csp[-1];
999                 if ((ppg = epg->page) == NULL) {
1000                         if ((ret = __memp_fput(dbmp, dbc->thread_info,
1001                              cp->csp->page, dbc->priority)) != 0)
1002                                 goto err1;
1003                         pg = NULL;
1004                         if ((ret = __memp_fget(dbmp, &ppgno, dbc->thread_info,
1005                             dbc->txn, DB_MPOOL_DIRTY, &ppg)) != 0)
1006                                 goto err1;
1007                         if ((ret = __memp_fget(dbmp, &pgno, dbc->thread_info,
1008                             dbc->txn, DB_MPOOL_DIRTY, &pg)) != 0)
1009                                 goto err1;
1010                         epg->page = ppg;
1011                         cp->csp->page = pg;
1012                 }
1013
1014                 /*
1015                  * If our current position is the last one on a parent
1016                  * page, then we are about to merge across different
1017                  * internal nodes.  Thus, we need to lock higher up
1018                  * in the tree.  We will exit the routine and commit
1019                  * what we have done so far.  Set spanp so we know
1020                  * we are in this case when we come back.
1021                  */
1022                 if (epg->indx == NUM_ENT(ppg) - 1) {
1023                         *spanp = 1;
1024                         npgno = PGNO(pg);
1025                         next_recno = cp->recno;
1026                         epg->page = ppg;
1027                         goto next_page;
1028                 }
1029
1030                 /* Lock and get the next page. */
1031                 TRY_LOCK(dbc, npgno,
1032                     saved_pgno, saved_lock, DB_LOCK_WRITE, retry);
1033                 if (ret != 0)
1034                         goto err1;
1035                 if ((ret = __LPUT(dbc, ncp->lock)) != 0)
1036                         goto err1;
1037                 ncp->lock = saved_lock;
1038                 LOCK_INIT(saved_lock);
1039                 saved_pgno = PGNO_INVALID;
1040
1041                 if ((ret = __memp_fget(dbmp, &npgno,
1042                     dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &npg)) != 0)
1043                         goto err1;
1044
1045                 if (check_trunc &&
1046                     PGNO(pg) > c_data->compact_truncate) {
1047                         if (PREV_PGNO(pg) != PGNO_INVALID) {
1048                                 TRY_LOCK(dbc, PREV_PGNO(pg),
1049                                     prev_pgno, prev_lock, DB_LOCK_WRITE, retry);
1050                                 if (ret != 0)
1051                                         goto err1;
1052                         }
1053                         pgs_done++;
1054                         /* Get a fresh low numbered page. */
1055                         if ((ret = __bam_truncate_page(dbc, &pg, npg, 1)) != 0)
1056                                 goto err1;
1057                         if ((ret = __TLPUT(dbc, prev_lock)) != 0)
1058                                 goto err1;
1059                         LOCK_INIT(prev_lock);
1060                         pgno = PGNO(pg);
1061                 }
1062                 c_data->compact_pages_examine++;
1063
1064                 PTRACE(dbc, "MDups", PGNO(npg), start, 0);
1065                 if (check_dups && (ret = __bam_compact_dups(ndbc,
1066                      &npg, factor, 1, c_data, &pgs_done)) != 0)
1067                         goto err1;
1068
1069                 npgno = NEXT_PGNO(npg);
1070                 if (npgno != PGNO_INVALID) {
1071                         TRY_LOCK(dbc, npgno,
1072                             nnext_pgno, nnext_lock, DB_LOCK_WRITE, retry);
1073                         if (ret != 0)
1074                                 goto err1;
1075                 }
1076
1077                 /* copy the common parent to the stack. */
1078                 BT_STK_PUSH(env, ncp, ppg,
1079                      epg->indx + 1, epg->lock, epg->lock_mode, ret);
1080                 if (ret != 0)
1081                         goto err1;
1082
1083                 /* Put the page on the stack. */
1084                 BT_STK_ENTER(env, ncp, npg, 0, ncp->lock, DB_LOCK_WRITE, ret);
1085
1086                 LOCK_INIT(ncp->lock);
1087                 npg = NULL;
1088
1089                 /*
1090                  * Merge the pages.  This will either free the next
1091                  * page or just update its parent pointer.
1092                  */
1093                 PTRACE(dbc, "Merge", PGNO(cp->csp->page), start, 0);
1094                 if ((ret = __bam_merge(dbc,
1095                      ndbc, factor, stop, c_data, &isdone)) != 0)
1096                         goto err1;
1097
1098                 pgs_done++;
1099
1100                 if ((ret = __TLPUT(dbc, nnext_lock)) != 0)
1101                         goto err1;
1102                 LOCK_INIT(nnext_lock);
1103
1104                 /*
1105                  * __bam_merge could have freed our stack if it
1106                  * deleted a page possibly collapsing the tree.
1107                  */
1108                 if (cp->csp->page == NULL)
1109                         goto deleted;
1110                 /* If we did not bump to the next page something did not fit. */
1111                 if (npgno != NEXT_PGNO(pg))
1112                         break;
1113         }
1114
1115         /* Bottom of the main loop.  Move to the next page. */
1116         npgno = NEXT_PGNO(pg);
1117         cp->recno += NUM_ENT(pg);
1118         next_recno = cp->recno;
1119
1120 next_page:
1121         if (ndbc != NULL) {
1122                 ncp = (BTREE_CURSOR *)ndbc->internal;
1123                 if (ncp->sp->page == cp->sp->page) {
1124                         ncp->sp->page = NULL;
1125                         LOCK_INIT(ncp->sp->lock);
1126                 }
1127                 if ((ret = __bam_stkrel(ndbc,
1128                      pgs_done == 0 ? STK_NOLOCK : 0)) != 0)
1129                         goto err;
1130         }
1131         /*
1132          * Unlatch the tree before trying to lock the next page.  We must
1133          * unlatch to avoid a latch deadlock but we want to hold the
1134          * lock on the parent node so this leaf cannot be unlinked.
1135          */
1136         pg = NULL;
1137         if ((ret = __bam_stkrel(dbc, STK_PGONLY)) != 0)
1138                 goto err;
1139         if ((ret = __db_lget(dbc, 0, npgno, DB_LOCK_READ, 0, &next_lock)) != 0)
1140                 goto err;
1141         if ((ret = __bam_stkrel(dbc, pgs_done == 0 ? STK_NOLOCK : 0)) != 0)
1142                 goto err;
1143         if ((ret = __TLPUT(dbc, saved_lock)) != 0)
1144                 goto err;
1145         if ((ret = __TLPUT(dbc, prev_lock)) != 0)
1146                 goto err;
1147
1148 next_no_release:
1149         pg = NULL;
1150
1151         if (npgno == PGNO_INVALID || c_data->compact_pages  == 0)
1152                 isdone = 1;
1153         if (!isdone) {
1154                 /*
1155                  * If we are at the end of this parent commit the
1156                  * transaction so we don't tie things up.
1157                  */
1158                 if (pgs_done != 0 && *spanp) {
1159 deleted:                if (((ret = __bam_stkrel(ndbc, 0)) != 0 ||
1160                              (ret = __dbc_close(ndbc)) != 0))
1161                                 goto err;
1162                         *donep = 0;
1163                         return (0);
1164                 }
1165
1166                 /* Reget the next page to look at. */
1167                 cp->recno = next_recno;
1168                 if ((ret = __memp_fget(dbmp, &npgno,
1169                     dbc->thread_info, dbc->txn, 0, &pg)) != 0)
1170                         goto err;
1171                 cp->csp->lock = next_lock;
1172                 LOCK_INIT(next_lock);
1173                 next_p = 1;
1174                 /* If we did not do anything we can drop the metalock. */
1175                 if (pgs_done == 0 && (ret = __LPUT(dbc, metalock)) != 0)
1176                         goto err;
1177                 goto next;
1178         }
1179
1180 done:
1181         if (0) {
1182                 /*
1183                  * We come here if pg came from cp->csp->page and could
1184                  * have already been fput.
1185                  */
1186 err1:           pg = NULL;
1187         }
1188 err:    /*
1189          * Don't release locks (STK_PGONLY)if we had an error, we could reveal
1190          * a bad tree to a dirty reader.  Wait till the abort to free the locks.
1191          */
1192         sflag = STK_CLRDBC;
1193         if (dbc->txn != NULL && ret != 0)
1194                 sflag |= STK_PGONLY;
1195         if (ndbc != NULL) {
1196                 ncp = (BTREE_CURSOR *)ndbc->internal;
1197                 if (npg == ncp->csp->page)
1198                         npg = NULL;
1199                 if (ncp->sp->page == cp->sp->page) {
1200                         ncp->sp->page = NULL;
1201                         LOCK_INIT(ncp->sp->lock);
1202                 }
1203                 if ((t_ret = __bam_stkrel(ndbc, sflag)) != 0 && ret == 0)
1204                         ret = t_ret;
1205                 else if ((t_ret = __dbc_close(ndbc)) != 0 && ret == 0)
1206                         ret = t_ret;
1207         }
1208         if (pg == cp->csp->page)
1209                 pg = NULL;
1210         if ((t_ret = __bam_stkrel(dbc, sflag)) != 0 && ret == 0)
1211                 ret = t_ret;
1212
1213         if ((t_ret = __TLPUT(dbc, metalock)) != 0 && ret == 0)
1214                 ret = t_ret;
1215
1216         if (pg != NULL && (t_ret =
1217              __memp_fput(dbmp,
1218                   dbc->thread_info, pg, dbc->priority) != 0) && ret == 0)
1219                 ret = t_ret;
1220         if (npg != NULL && (t_ret =
1221              __memp_fput(dbmp,
1222                   dbc->thread_info, npg, dbc->priority) != 0) && ret == 0)
1223                 ret = t_ret;
1224
1225         *donep = isdone;
1226
1227         return (ret);
1228 }
1229
1230 /*
1231  * __bam_merge -- do actual merging of leaf pages.
1232  */
1233 static int
1234 __bam_merge(dbc, ndbc, factor, stop, c_data, donep)
1235         DBC *dbc, *ndbc;
1236         u_int32_t factor;
1237         DBT *stop;
1238         DB_COMPACT *c_data;
1239         int *donep;
1240 {
1241         BTREE_CURSOR *cp, *ncp;
1242         DB *dbp;
1243         PAGE *pg, *npg;
1244         db_indx_t nent;
1245         int ret;
1246
1247         DB_ASSERT(NULL, dbc != NULL);
1248         DB_ASSERT(NULL, ndbc != NULL);
1249         dbp = dbc->dbp;
1250         cp = (BTREE_CURSOR *)dbc->internal;
1251         ncp = (BTREE_CURSOR *)ndbc->internal;
1252         pg = cp->csp->page;
1253         npg = ncp->csp->page;
1254
1255         nent = NUM_ENT(npg);
1256
1257         /* If the page is empty just throw it away. */
1258         if (nent == 0)
1259                 goto free_page;
1260
1261         /* Find if the stopping point is on this page. */
1262         if (stop != NULL && stop->size != 0) {
1263                 if ((ret = __bam_compact_isdone(dbc, stop, npg, donep)) != 0)
1264                         return (ret);
1265                 if (*donep)
1266                         return (0);
1267         }
1268
1269         /*
1270          * If there is too much data then just move records one at a time.
1271          * Otherwise copy the data space over and fix up the index table.
1272          * If we are on the left most child we will effect our parent's
1273          * index entry so we call merge_records to figure out key sizes.
1274          */
1275         if ((dbc->dbtype == DB_BTREE &&
1276             ncp->csp[-1].indx == 0 && ncp->csp[-1].entries != 1) ||
1277             (int)(P_FREESPACE(dbp, pg) -
1278             ((dbp->pgsize - P_OVERHEAD(dbp)) -
1279             P_FREESPACE(dbp, npg))) < (int)factor)
1280                 ret = __bam_merge_records(dbc, ndbc, factor, c_data);
1281         else
1282 free_page:      ret = __bam_merge_pages(dbc, ndbc, c_data);
1283
1284         return (ret);
1285 }
1286
1287 static int
1288 __bam_merge_records(dbc, ndbc, factor, c_data)
1289         DBC *dbc, *ndbc;
1290         u_int32_t factor;
1291         DB_COMPACT *c_data;
1292 {
1293         BINTERNAL *bi;
1294         BKEYDATA *bk, *tmp_bk;
1295         BTREE *t;
1296         BTREE_CURSOR *cp, *ncp;
1297         DB *dbp;
1298         DBT a, b, data, hdr;
1299         ENV *env;
1300         EPG *epg;
1301         PAGE *pg, *npg;
1302         db_indx_t adj, indx, nent, *ninp, pind;
1303         int32_t adjust;
1304         u_int32_t freespace, nksize, pfree, size;
1305         int first_dup, is_dup, next_dup, n_ok, ret;
1306         size_t (*func) __P((DB *, const DBT *, const DBT *));
1307
1308         dbp = dbc->dbp;
1309         env = dbp->env;
1310         t = dbp->bt_internal;
1311         cp = (BTREE_CURSOR *)dbc->internal;
1312         ncp = (BTREE_CURSOR *)ndbc->internal;
1313         pg = cp->csp->page;
1314         npg = ncp->csp->page;
1315         memset(&hdr, 0, sizeof(hdr));
1316         pind = NUM_ENT(pg);
1317         n_ok = 0;
1318         adjust = 0;
1319         ret = 0;
1320         nent = NUM_ENT(npg);
1321
1322         DB_ASSERT(env, nent != 0);
1323
1324         /* See if we want to swap out this page. */
1325         if (c_data->compact_truncate != PGNO_INVALID &&
1326              PGNO(npg) > c_data->compact_truncate) {
1327                 /* Get a fresh low numbered page. */
1328                 if ((ret = __bam_truncate_page(ndbc, &npg, pg, 1)) != 0)
1329                         goto err;
1330         }
1331
1332         ninp = P_INP(dbp, npg);
1333
1334         /*
1335          * pg is the page that is being filled, it is in the stack in cp.
1336          * npg is the next page, it is in the stack in ncp.
1337          */
1338         freespace = P_FREESPACE(dbp, pg);
1339
1340         adj = TYPE(npg) == P_LBTREE ? P_INDX : O_INDX;
1341         /*
1342          * Loop through the records and find the stopping point.
1343          */
1344         for (indx = 0; indx < nent; indx += adj)  {
1345                 bk = GET_BKEYDATA(dbp, npg, indx);
1346
1347                 /* Size of the key. */
1348                 size = BITEM_PSIZE(bk);
1349
1350                 /* Size of the data. */
1351                 if (TYPE(pg) == P_LBTREE)
1352                         size += BITEM_PSIZE(GET_BKEYDATA(dbp, npg, indx + 1));
1353                 /*
1354                  * If we are at a duplicate set, skip ahead to see and
1355                  * get the total size for the group.
1356                  */
1357                 n_ok = adj;
1358                 if (TYPE(pg) == P_LBTREE &&
1359                      indx < nent - adj &&
1360                      ninp[indx] == ninp[indx + adj]) {
1361                         do {
1362                                 /* Size of index for key reference. */
1363                                 size += sizeof(db_indx_t);
1364                                 n_ok++;
1365                                 /* Size of data item. */
1366                                 size += BITEM_PSIZE(
1367                                     GET_BKEYDATA(dbp, npg, indx + n_ok));
1368                                 n_ok++;
1369                         } while (indx + n_ok < nent &&
1370                             ninp[indx] == ninp[indx + n_ok]);
1371                 }
1372                 /* if the next set will not fit on the page we are done. */
1373                 if (freespace < size)
1374                         break;
1375
1376                 /*
1377                  * Otherwise figure out if we are past the goal and if
1378                  * adding this set will put us closer to the goal than
1379                  * we are now.
1380                  */
1381                 if ((freespace - size) < factor) {
1382                         if (freespace - factor > factor - (freespace - size))
1383                                 indx += n_ok;
1384                         break;
1385                 }
1386                 freespace -= size;
1387                 indx += n_ok - adj;
1388         }
1389
1390         /* If we have hit the first record then there is nothing we can move. */
1391         if (indx == 0)
1392                 goto done;
1393         if (TYPE(pg) != P_LBTREE && TYPE(pg) != P_LDUP) {
1394                 if (indx == nent)
1395                         return (__bam_merge_pages(dbc, ndbc, c_data));
1396                 goto no_check;
1397         }
1398         /*
1399          * We need to update npg's parent key.  Avoid creating a new key
1400          * that will be too big. Get what space will be available on the
1401          * parents. Then if there will not be room for this key, see if
1402          * prefix compression will make it work, if not backup till we
1403          * find something that will.  (Needless to say, this is a very
1404          * unlikely event.)  If we are deleting this page then we will
1405          * need to propagate the next key to our grand parents, so we
1406          * see if that will fit.
1407          */
1408         pfree = dbp->pgsize;
1409         for (epg = &ncp->csp[-1]; epg >= ncp->sp; epg--)
1410                 if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) {
1411                         bi = GET_BINTERNAL(dbp, epg->page, epg->indx);
1412                         /* Add back in the key we will be deleting. */
1413                         freespace += BINTERNAL_PSIZE(bi->len);
1414                         if (freespace < pfree)
1415                                 pfree = freespace;
1416                         if (epg->indx != 0)
1417                                 break;
1418                 }
1419
1420         /*
1421          * If we are at the end, we will delete this page.  We need to
1422          * check the next parent key only if we are the leftmost page and
1423          * will therefore have to propagate the key up the tree.
1424          */
1425         if (indx == nent) {
1426                 if (ncp->csp[-1].indx != 0 || ncp->csp[-1].entries == 1 ||
1427                      BINTERNAL_PSIZE(GET_BINTERNAL(dbp,
1428                      ncp->csp[-1].page, 1)->len) <= pfree)
1429                         return (__bam_merge_pages(dbc, ndbc, c_data));
1430                 indx -= adj;
1431         }
1432         bk = GET_BKEYDATA(dbp, npg, indx);
1433         if (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) {
1434                 if (F_ISSET(dbc, DBC_OPD)) {
1435                         if (dbp->dup_compare == __bam_defcmp)
1436                                 func = __bam_defpfx;
1437                         else
1438                                 func = NULL;
1439                 } else
1440                         func = t->bt_prefix;
1441         } else
1442                 func = NULL;
1443
1444         /* Skip to the beginning of a duplicate set. */
1445         while (indx != 0 && ninp[indx] == ninp[indx - adj])
1446                 indx -= adj;
1447
1448         while (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) {
1449                 if (B_TYPE(bk->type) != B_KEYDATA)
1450                         goto noprefix;
1451                 /*
1452                  * Figure out if we can truncate this key.
1453                  * Code borrowed from bt_split.c
1454                  */
1455                 if (func == NULL)
1456                         goto noprefix;
1457                 tmp_bk = GET_BKEYDATA(dbp, npg, indx - adj);
1458                 if (B_TYPE(tmp_bk->type) != B_KEYDATA)
1459                         goto noprefix;
1460                 memset(&a, 0, sizeof(a));
1461                 a.size = tmp_bk->len;
1462                 a.data = tmp_bk->data;
1463                 memset(&b, 0, sizeof(b));
1464                 b.size = bk->len;
1465                 b.data = bk->data;
1466                 nksize = (u_int32_t)func(dbp, &a, &b);
1467                 if (BINTERNAL_PSIZE(nksize) < pfree)
1468                         break;
1469 noprefix:
1470                 /* Skip to the beginning of a duplicate set. */
1471                 do {
1472                         indx -= adj;
1473                 } while (indx != 0 &&  ninp[indx] == ninp[indx - adj]);
1474
1475                 bk = GET_BKEYDATA(dbp, npg, indx);
1476         }
1477
1478         /*
1479          * indx references the first record that will not move to the previous
1480          * page.  If it is 0 then we could not find a key that would fit in
1481          * the parent that would permit us to move any records.
1482          */
1483         if (indx == 0)
1484                 goto done;
1485         DB_ASSERT(env, indx <= nent);
1486
1487         /* Loop through the records and move them from npg to pg. */
1488 no_check: is_dup = first_dup = next_dup = 0;
1489         pg = cp->csp->page;
1490         npg = ncp->csp->page;
1491         DB_ASSERT(env, IS_DIRTY(pg));
1492         DB_ASSERT(env, IS_DIRTY(npg));
1493         ninp = P_INP(dbp, npg);
1494         do {
1495                 bk = GET_BKEYDATA(dbp, npg, 0);
1496                 /* Figure out if we are in a duplicate group or not. */
1497                 if ((NUM_ENT(npg) % 2) == 0) {
1498                         if (NUM_ENT(npg) > 2 && ninp[0] == ninp[2]) {
1499                                 if (!is_dup) {
1500                                         first_dup = 1;
1501                                         is_dup = 1;
1502                                 } else
1503                                         first_dup = 0;
1504
1505                                 next_dup = 1;
1506                         } else if (next_dup) {
1507                                 is_dup = 1;
1508                                 first_dup = 0;
1509                                 next_dup = 0;
1510                         } else
1511                                 is_dup = 0;
1512                 }
1513
1514                 if (is_dup && !first_dup && (pind % 2) == 0) {
1515                         /* Duplicate key. */
1516                         if ((ret = __bam_adjindx(dbc,
1517                              pg, pind, pind - P_INDX, 1)) != 0)
1518                                 goto err;
1519                         if (!next_dup)
1520                                 is_dup = 0;
1521                 } else switch (B_TYPE(bk->type)) {
1522                 case B_KEYDATA:
1523                         hdr.data = bk;
1524                         hdr.size = SSZA(BKEYDATA, data);
1525                         data.size = bk->len;
1526                         data.data = bk->data;
1527                         if ((ret = __db_pitem(dbc, pg, pind,
1528                              BKEYDATA_SIZE(bk->len), &hdr, &data)) != 0)
1529                                 goto err;
1530                         break;
1531                 case B_OVERFLOW:
1532                 case B_DUPLICATE:
1533                         data.size = BOVERFLOW_SIZE;
1534                         data.data = bk;
1535                         if ((ret = __db_pitem(dbc, pg, pind,
1536                              BOVERFLOW_SIZE, &data, NULL)) != 0)
1537                                 goto err;
1538                         break;
1539                 default:
1540                         __db_errx(env,
1541                             "Unknown record format, page %lu, indx 0",
1542                             (u_long)PGNO(pg));
1543                         ret = EINVAL;
1544                         goto err;
1545                 }
1546                 pind++;
1547                 if (next_dup && (NUM_ENT(npg) % 2) == 0) {
1548                         if ((ret = __bam_adjindx(ndbc,
1549                              npg, 0, O_INDX, 0)) != 0)
1550                                 goto err;
1551                 } else {
1552                         if ((ret = __db_ditem(ndbc,
1553                              npg, 0, BITEM_SIZE(bk))) != 0)
1554                                 goto err;
1555                 }
1556                 adjust++;
1557         } while (--indx != 0);
1558
1559         DB_ASSERT(env, NUM_ENT(npg) != 0);
1560
1561         if (adjust != 0 &&
1562              (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD))) {
1563                 if (TYPE(pg) == P_LBTREE)
1564                         adjust /= P_INDX;
1565                 if ((ret = __bam_adjust(ndbc, -adjust)) != 0)
1566                         goto err;
1567
1568                 if ((ret = __bam_adjust(dbc, adjust)) != 0)
1569                         goto err;
1570         }
1571
1572         /* Update parent with new key. */
1573         if (ndbc->dbtype == DB_BTREE &&
1574             (ret = __bam_pupdate(ndbc, pg)) != 0)
1575                 goto err;
1576
1577 done:   if (cp->sp->page == ncp->sp->page) {
1578                 cp->sp->page = NULL;
1579                 LOCK_INIT(cp->sp->lock);
1580         }
1581         ret = __bam_stkrel(ndbc, STK_CLRDBC);
1582
1583 err:    return (ret);
1584 }
1585
1586 static int
1587 __bam_merge_pages(dbc, ndbc, c_data)
1588         DBC *dbc, *ndbc;
1589         DB_COMPACT *c_data;
1590 {
1591         BTREE_CURSOR *cp, *ncp;
1592         DB *dbp;
1593         DBT data, hdr;
1594         DB_MPOOLFILE *dbmp;
1595         PAGE *pg, *npg;
1596         db_indx_t nent, *ninp, *pinp;
1597         db_pgno_t ppgno;
1598         u_int8_t *bp;
1599         u_int32_t len;
1600         int i, level, ret;
1601
1602         COMPQUIET(ppgno, PGNO_INVALID);
1603         dbp = dbc->dbp;
1604         dbmp = dbp->mpf;
1605         cp = (BTREE_CURSOR *)dbc->internal;
1606         ncp = (BTREE_CURSOR *)ndbc->internal;
1607         pg = cp->csp->page;
1608         npg = ncp->csp->page;
1609         memset(&hdr, 0, sizeof(hdr));
1610         nent = NUM_ENT(npg);
1611
1612         /* If the page is empty just throw it away. */
1613         if (nent == 0)
1614                 goto free_page;
1615
1616         pg = cp->csp->page;
1617         npg = ncp->csp->page;
1618         DB_ASSERT(dbp->env, IS_DIRTY(pg));
1619         DB_ASSERT(dbp->env, IS_DIRTY(npg));
1620         DB_ASSERT(dbp->env, nent == NUM_ENT(npg));
1621
1622         /* Bulk copy the data to the new page. */
1623         len = dbp->pgsize - HOFFSET(npg);
1624         if (DBC_LOGGING(dbc)) {
1625                 memset(&hdr, 0, sizeof(hdr));
1626                 hdr.data = npg;
1627                 hdr.size = LOFFSET(dbp, npg);
1628                 memset(&data, 0, sizeof(data));
1629                 data.data = (u_int8_t *)npg + HOFFSET(npg);
1630                 data.size = len;
1631                 if ((ret = __bam_merge_log(dbp,
1632                      dbc->txn, &LSN(pg), 0, PGNO(pg),
1633                      &LSN(pg), PGNO(npg), &LSN(npg), &hdr, &data, 0)) != 0)
1634                         goto err;
1635         } else
1636                 LSN_NOT_LOGGED(LSN(pg));
1637         LSN(npg) = LSN(pg);
1638         bp = (u_int8_t *)pg + HOFFSET(pg) - len;
1639         memcpy(bp, (u_int8_t *)npg + HOFFSET(npg), len);
1640
1641         /* Copy index table offset by what was there already. */
1642         pinp = P_INP(dbp, pg) + NUM_ENT(pg);
1643         ninp = P_INP(dbp, npg);
1644         for (i = 0; i < NUM_ENT(npg); i++)
1645                 *pinp++ = *ninp++ - (dbp->pgsize - HOFFSET(pg));
1646         HOFFSET(pg) -= len;
1647         NUM_ENT(pg) += i;
1648
1649         NUM_ENT(npg) = 0;
1650         HOFFSET(npg) += len;
1651
1652         if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD)) {
1653                 /*
1654                  * There are two cases here regarding the stack.
1655                  * Either we have two two level stacks but only ndbc
1656                  * references the parent page or we have a multilevel
1657                  * stack and only ndbc has an entry for the spanning
1658                  * page.
1659                  */
1660                 if (TYPE(pg) == P_LBTREE)
1661                         i /= P_INDX;
1662                 if ((ret = __bam_adjust(ndbc, -i)) != 0)
1663                         goto err;
1664
1665                 if ((ret = __bam_adjust(dbc, i)) != 0)
1666                         goto err;
1667         }
1668
1669 free_page:
1670         /*
1671          * __bam_dpages may decide to collapse the tree.
1672          * This can happen if we have the root and there
1673          * are exactly 2 pointers left in it.
1674          * If it can collapse the tree we must free the other
1675          * stack since it will nolonger be valid.  This
1676          * must be done before hand because we cannot
1677          * hold a page pinned if it might be truncated.
1678          */
1679         if ((ret = __bam_relink(dbc,
1680             ncp->csp->page, cp->csp->page, PGNO_INVALID)) != 0)
1681                 goto err;
1682         /* Drop the duplicate reference to the sub tree root. */
1683         cp->sp->page = NULL;
1684         LOCK_INIT(cp->sp->lock);
1685         if (PGNO(ncp->sp->page) == ncp->root &&
1686             NUM_ENT(ncp->sp->page) == 2) {
1687                 if ((ret = __bam_stkrel(dbc, STK_CLRDBC | STK_PGONLY)) != 0)
1688                         goto err;
1689                 level = LEVEL(ncp->sp->page);
1690                 ppgno = PGNO(ncp->csp[-1].page);
1691         } else
1692                 level = 0;
1693         if (c_data->compact_truncate > PGNO(npg))
1694                 c_data->compact_truncate--;
1695         if ((ret = __bam_dpages(ndbc,
1696             0, ndbc->dbtype == DB_RECNO ? 0 : BTD_UPDATE)) != 0)
1697                 goto err;
1698         npg = NULL;
1699         c_data->compact_pages_free++;
1700         c_data->compact_pages--;
1701         if (level != 0) {
1702                 if ((ret = __memp_fget(dbmp, &ncp->root,
1703                     dbc->thread_info, dbc->txn, 0, &npg)) != 0)
1704                         goto err;
1705                 if (level == LEVEL(npg))
1706                         level = 0;
1707                 if ((ret = __memp_fput(dbmp,
1708                      dbc->thread_info, npg, dbc->priority)) != 0)
1709                         goto err;
1710                 npg = NULL;
1711                 if (level != 0) {
1712                         c_data->compact_levels++;
1713                         c_data->compact_pages_free++;
1714                         if (c_data->compact_truncate > ppgno)
1715                                 c_data->compact_truncate--;
1716                         if (c_data->compact_pages != 0)
1717                                 c_data->compact_pages--;
1718                 }
1719         }
1720
1721 err:    return (ret);
1722 }
1723
1724 /*
1725  * __bam_merge_internal --
1726  *      Merge internal nodes of the tree.
1727  */
1728 static int
1729 __bam_merge_internal(dbc, ndbc, level, c_data, merged)
1730         DBC *dbc, *ndbc;
1731         int level;
1732         DB_COMPACT *c_data;
1733         int *merged;
1734 {
1735         BINTERNAL bi, *bip, *fip;
1736         BTREE_CURSOR *cp, *ncp;
1737         DB *dbp;
1738         DBT data, hdr;
1739         DB_MPOOLFILE *dbmp;
1740         EPG *epg, *save_csp, *nsave_csp;
1741         PAGE *pg, *npg;
1742         RINTERNAL *rk;
1743         db_indx_t first, indx, pind;
1744         db_pgno_t ppgno;
1745         int32_t nrecs, trecs;
1746         u_int16_t size;
1747         u_int32_t freespace, pfree;
1748         int ret;
1749
1750         COMPQUIET(bip, NULL);
1751         COMPQUIET(ppgno, PGNO_INVALID);
1752         DB_ASSERT(NULL, dbc != NULL);
1753         DB_ASSERT(NULL, ndbc != NULL);
1754
1755         /*
1756          * ndbc will contain the the dominating parent of the subtree.
1757          * dbc will have the tree containing the left child.
1758          *
1759          * The stacks descend to the leaf level.
1760          * If this is a recno tree then both stacks will start at the root.
1761          */
1762         dbp = dbc->dbp;
1763         dbmp = dbp->mpf;
1764         cp = (BTREE_CURSOR *)dbc->internal;
1765         ncp = (BTREE_CURSOR *)ndbc->internal;
1766         *merged = 0;
1767         ret = 0;
1768
1769         /*
1770          * Set the stacks to the level requested.
1771          * Save the old value to restore when we exit.
1772          */
1773         save_csp = cp->csp;
1774         cp->csp = &cp->csp[-level + 1];
1775         pg = cp->csp->page;
1776         pind = NUM_ENT(pg);
1777
1778         nsave_csp = ncp->csp;
1779         ncp->csp = &ncp->csp[-level + 1];
1780         npg = ncp->csp->page;
1781         indx = NUM_ENT(npg);
1782
1783         /*
1784          * The caller may have two stacks that include common ancestors, we
1785          * check here for convenience.
1786          */
1787         if (npg == pg)
1788                 goto done;
1789
1790         if (TYPE(pg) == P_IBTREE) {
1791                 /*
1792                  * Check for overflow keys on both pages while we have
1793                  * them locked.
1794                  */
1795                  if ((ret =
1796                       __bam_truncate_internal_overflow(dbc, pg, c_data)) != 0)
1797                         goto err;
1798                  if ((ret =
1799                       __bam_truncate_internal_overflow(dbc, npg, c_data)) != 0)
1800                         goto err;
1801         }
1802
1803         /*
1804          * If we are about to move data off the left most page of an
1805          * internal node we will need to update its parents, make sure there
1806          * will be room for the new key on all the parents in the stack.
1807          * If not, move less data.
1808          */
1809         fip = NULL;
1810         if (TYPE(pg) == P_IBTREE) {
1811                 /* See where we run out of space. */
1812                 freespace = P_FREESPACE(dbp, pg);
1813                 /*
1814                  * The leftmost key of an internal page is not accurate.
1815                  * Go up the tree to find a non-leftmost parent.
1816                  */
1817                 epg = ncp->csp;
1818                 while (--epg >= ncp->sp && epg->indx == 0)
1819                         continue;
1820                 fip = bip = GET_BINTERNAL(dbp, epg->page, epg->indx);
1821                 epg = ncp->csp;
1822
1823                 for (indx = 0;;) {
1824                         size = BINTERNAL_PSIZE(bip->len);
1825                         if (size > freespace)
1826                                 break;
1827                         freespace -= size;
1828                         if (++indx >= NUM_ENT(npg))
1829                                 break;
1830                         bip = GET_BINTERNAL(dbp, npg, indx);
1831                 }
1832
1833                 /* See if we are deleting the page and we are not left most. */
1834                 if (indx == NUM_ENT(npg) && epg[-1].indx != 0)
1835                         goto fits;
1836
1837                 pfree = dbp->pgsize;
1838                 for (epg--; epg >= ncp->sp; epg--)
1839                         if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) {
1840                                 bip = GET_BINTERNAL(dbp, epg->page, epg->indx);
1841                                 /* Add back in the key we will be deleting. */
1842                                 freespace += BINTERNAL_PSIZE(bip->len);
1843                                 if (freespace < pfree)
1844                                         pfree = freespace;
1845                                 if (epg->indx != 0)
1846                                         break;
1847                         }
1848                 epg = ncp->csp;
1849
1850                 /* If we are at the end of the page we will delete it. */
1851                 if (indx == NUM_ENT(npg)) {
1852                         if (NUM_ENT(epg[-1].page) == 1)
1853                                 goto fits;
1854                         bip =
1855                              GET_BINTERNAL(dbp, epg[-1].page, epg[-1].indx + 1);
1856                 } else
1857                         bip = GET_BINTERNAL(dbp, npg, indx);
1858
1859                 /* Back up until we have a key that fits. */
1860                 while (indx != 0 && BINTERNAL_PSIZE(bip->len) > pfree) {
1861                         indx--;
1862                         bip = GET_BINTERNAL(dbp, npg, indx);
1863                 }
1864                 if (indx == 0)
1865                         goto done;
1866         }
1867
1868 fits:   memset(&bi, 0, sizeof(bi));
1869         memset(&hdr, 0, sizeof(hdr));
1870         memset(&data, 0, sizeof(data));
1871         trecs = 0;
1872
1873         /*
1874          * Copy data between internal nodes till one is full
1875          * or the other is empty.
1876          */
1877         first = 0;
1878         nrecs = 0;
1879         do {
1880                 if (dbc->dbtype == DB_BTREE) {
1881                         bip = GET_BINTERNAL(dbp, npg, 0);
1882                         size = fip == NULL ?
1883                              BINTERNAL_SIZE(bip->len) :
1884                              BINTERNAL_SIZE(fip->len);
1885                         if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t))
1886                                 break;
1887
1888                         if (fip == NULL) {
1889                                 data.size = bip->len;
1890                                 data.data = bip->data;
1891                         } else {
1892                                 data.size = fip->len;
1893                                 data.data = fip->data;
1894                         }
1895                         bi.len = data.size;
1896                         B_TSET(bi.type, bip->type);
1897                         bi.pgno = bip->pgno;
1898                         bi.nrecs = bip->nrecs;
1899                         hdr.data = &bi;
1900                         hdr.size = SSZA(BINTERNAL, data);
1901                         if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD))
1902                                 nrecs = (int32_t)bip->nrecs;
1903                 } else {
1904                         rk = GET_RINTERNAL(dbp, npg, 0);
1905                         size = RINTERNAL_SIZE;
1906                         if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t))
1907                                 break;
1908
1909                         hdr.data = rk;
1910                         hdr.size = size;
1911                         nrecs = (int32_t)rk->nrecs;
1912                 }
1913                 /*
1914                  * Try to lock the subtree leaf records without waiting.
1915                  * We must lock the subtree below the record we are merging
1916                  * and the one after it since that is were a search will wind
1917                  * up if it has already looked at our parent.  After the first
1918                  * move we have the current subtree already locked.
1919                  * If we merged any records then we will revisit this
1920                  * node when we merge its leaves.  If not we will return
1921                  * NOTGRANTED and our caller will do a retry.  We only
1922                  * need to do this if we are in a transation. If not then
1923                  * we cannot abort and things will be hosed up on error
1924                  * anyway.
1925                  */
1926                 if (dbc->txn != NULL && (ret = __bam_lock_tree(ndbc,
1927                     ncp->csp, nsave_csp, first,
1928                     NUM_ENT(ncp->csp->page) == 1 ? 1 : 2)) != 0) {
1929                         if (ret != DB_LOCK_NOTGRANTED)
1930                                 goto err;
1931                         break;
1932                 }
1933                 first = 1;
1934                 if ((ret = __db_pitem(dbc, pg, pind, size, &hdr, &data)) != 0)
1935                         goto err;
1936                 pind++;
1937                 if (fip != NULL) {
1938                         /* reset size to be for the record being deleted. */
1939                         size = BINTERNAL_SIZE(bip->len);
1940                         fip = NULL;
1941                 }
1942                 if ((ret = __db_ditem(ndbc, npg, 0, size)) != 0)
1943                         goto err;
1944                 *merged = 1;
1945                 trecs += nrecs;
1946         } while (--indx != 0);
1947
1948         if (!*merged)
1949                 goto done;
1950
1951         if (trecs != 0) {
1952                 cp->csp--;
1953                 ret = __bam_adjust(dbc, trecs);
1954                 if (ret != 0)
1955                         goto err;
1956                 cp->csp++;
1957                 ncp->csp--;
1958                 if ((ret = __bam_adjust(ndbc, -trecs)) != 0)
1959                         goto err;
1960                 ncp->csp++;
1961         }
1962
1963         /*
1964          * Either we emptied the page or we need to update its
1965          * parent to reflect the first page we now point to.
1966          * First get rid of the bottom of the stack,
1967          * bam_dpages will clear the stack.  Maintain transactional
1968          * locks on the leaf pages to protect changes at this level.
1969          */
1970         do {
1971                 if ((ret = __memp_fput(dbmp, dbc->thread_info,
1972                     nsave_csp->page, dbc->priority)) != 0)
1973                         goto err;
1974                 nsave_csp->page = NULL;
1975                 if ((ret = __TLPUT(dbc, nsave_csp->lock)) != 0)
1976                         goto err;
1977                 LOCK_INIT(nsave_csp->lock);
1978                 nsave_csp--;
1979         } while (nsave_csp != ncp->csp);
1980
1981         if (NUM_ENT(npg) == 0)  {
1982                 /*
1983                  * __bam_dpages may decide to collapse the tree
1984                  * so we need to free our other stack.  The tree
1985                  * will change in hight and our stack will nolonger
1986                  * be valid.
1987                  */
1988                 cp->csp = save_csp;
1989                 cp->sp->page = NULL;
1990                 LOCK_INIT(cp->sp->lock);
1991                 if (PGNO(ncp->sp->page) == ncp->root &&
1992                     NUM_ENT(ncp->sp->page) == 2) {
1993                         if ((ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0)
1994                                 goto err;
1995                         level = LEVEL(ncp->sp->page);
1996                         ppgno = PGNO(ncp->csp[-1].page);
1997                 } else
1998                         level = 0;
1999
2000                 if (c_data->compact_truncate > PGNO(npg))
2001                         c_data->compact_truncate--;
2002                 ret = __bam_dpages(ndbc,
2003                      0, ndbc->dbtype == DB_RECNO ?
2004                      BTD_RELINK : BTD_UPDATE | BTD_RELINK);
2005                 c_data->compact_pages_free++;
2006                 if (ret == 0 && level != 0) {
2007                         if ((ret = __memp_fget(dbmp, &ncp->root,
2008                             dbc->thread_info, dbc->txn, 0, &npg)) != 0)
2009                                 goto err;
2010                         if (level == LEVEL(npg))
2011                                 level = 0;
2012                         if ((ret = __memp_fput(dbmp,
2013                             dbc->thread_info, npg, dbc->priority)) != 0)
2014                                 goto err;
2015                         npg = NULL;
2016                         if (level != 0) {
2017                                 c_data->compact_levels++;
2018                                 c_data->compact_pages_free++;
2019                                 if (c_data->compact_truncate > ppgno)
2020                                         c_data->compact_truncate--;
2021                                 if (c_data->compact_pages != 0)
2022                                         c_data->compact_pages--;
2023                         }
2024                 }
2025         } else {
2026                 ret = __bam_pupdate(ndbc, npg);
2027
2028                 if (NUM_ENT(npg) != 0 &&
2029                     c_data->compact_truncate != PGNO_INVALID &&
2030                     PGNO(npg) > c_data->compact_truncate &&
2031                     ncp->csp != ncp->sp) {
2032                         if ((ret = __bam_truncate_page(ndbc, &npg, pg, 1)) != 0)
2033                                 goto err;
2034                 }
2035                 if (c_data->compact_truncate != PGNO_INVALID &&
2036                      PGNO(pg) > c_data->compact_truncate && cp->csp != cp->sp) {
2037                         if ((ret = __bam_truncate_page(dbc, &pg, npg, 1)) != 0)
2038                                 goto err;
2039                 }
2040         }
2041         cp->csp = save_csp;
2042
2043         return (ret);
2044
2045 done:
2046 err:    cp->csp = save_csp;
2047         ncp->csp = nsave_csp;
2048
2049         return (ret);
2050 }
2051
2052 /*
2053  * __bam_compact_dups -- try to compress off page dup trees.
2054  * We may or may not have a write lock on this page.
2055  */
2056 static int
2057 __bam_compact_dups(dbc, ppg, factor, have_lock, c_data, donep)
2058         DBC *dbc;
2059         PAGE **ppg;
2060         u_int32_t factor;
2061         int have_lock;
2062         DB_COMPACT *c_data;
2063         int *donep;
2064 {
2065         BOVERFLOW *bo;
2066         BTREE_CURSOR *cp;
2067         DB *dbp;
2068         DBC *opd;
2069         DBT start;
2070         DB_MPOOLFILE *dbmp;
2071         ENV *env;
2072         PAGE *dpg, *pg;
2073         db_indx_t i;
2074         db_pgno_t pgno;
2075         int isdone, level, ret, span, t_ret;
2076
2077         span = 0;
2078         ret = 0;
2079         opd = NULL;
2080
2081         DB_ASSERT(NULL, dbc != NULL);
2082         dbp = dbc->dbp;
2083         env = dbp->env;
2084         dbmp = dbp->mpf;
2085         cp = (BTREE_CURSOR *)dbc->internal;
2086         pg = *ppg;
2087
2088         for (i = 0; i <  NUM_ENT(pg); i++) {
2089                 bo = GET_BOVERFLOW(dbp, pg, i);
2090                 if (B_TYPE(bo->type) == B_KEYDATA)
2091                         continue;
2092                 c_data->compact_pages_examine++;
2093                 if (bo->pgno > c_data->compact_truncate) {
2094                         (*donep)++;
2095                         if (!have_lock) {
2096                                 /*
2097                                  * The caller should have the page at
2098                                  * least read locked.  Drop the buffer
2099                                  * and get the write lock.
2100                                  */
2101                                 pgno = PGNO(pg);
2102                                 if ((ret = __memp_fput(dbmp, dbc->thread_info,
2103                                      pg, dbc->priority)) != 0)
2104                                         goto err;
2105                                 *ppg = NULL;
2106                                 if ((ret = __db_lget(dbc, 0, pgno,
2107                                      DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0)
2108                                         goto err;
2109                                 have_lock = 1;
2110                                 if ((ret = __memp_fget(dbmp, &pgno,
2111                                     dbc->thread_info,
2112                                     dbc->txn, DB_MPOOL_DIRTY, ppg)) != 0)
2113                                         goto err;
2114                                 pg = *ppg;
2115                         }
2116                         if ((ret =
2117                              __bam_truncate_root_page(dbc, pg, i, c_data)) != 0)
2118                                 goto err;
2119                         /* Just in case it should move.  Could it? */
2120                         bo = GET_BOVERFLOW(dbp, pg, i);
2121                 }
2122
2123                 if (B_TYPE(bo->type) == B_OVERFLOW) {
2124                         if ((ret = __bam_truncate_overflow(dbc,
2125                             bo->pgno, have_lock ? NULL : ppg, c_data)) != 0)
2126                                 goto err;
2127                         (*donep)++;
2128                         continue;
2129                 }
2130                 /*
2131                  * Take a peek at the root.  If it's a leaf then
2132                  * there is no tree here, avoid all the trouble.
2133                  */
2134                 if ((ret = __memp_fget(dbmp, &bo->pgno,
2135                      dbc->thread_info, dbc->txn, 0, &dpg)) != 0)
2136                         goto err;
2137
2138                 level = dpg->level;
2139                 if ((ret = __memp_fput(dbmp,
2140                      dbc->thread_info, dpg, dbc->priority)) != 0)
2141                         goto err;
2142                 if (level == LEAFLEVEL)
2143                         continue;
2144                 if ((ret = __dbc_newopd(dbc, bo->pgno, NULL, &opd)) != 0)
2145                         return (ret);
2146                 if (!have_lock) {
2147                         /*
2148                          * The caller should have the page at
2149                          * least read locked.  Drop the buffer
2150                          * and get the write lock.
2151                          */
2152                         pgno = PGNO(pg);
2153                         if ((ret = __memp_fput(dbmp, dbc->thread_info,
2154                              pg, dbc->priority)) != 0)
2155                                 goto err;
2156                         *ppg = NULL;
2157                         if ((ret = __db_lget(dbc, 0, pgno,
2158                              DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0)
2159                                 goto err;
2160                         have_lock = 1;
2161                         if ((ret = __memp_fget(dbmp, &pgno,
2162                             dbc->thread_info,
2163                             dbc->txn, DB_MPOOL_DIRTY, ppg)) != 0)
2164                                 goto err;
2165                         pg = *ppg;
2166                 }
2167                 (*donep)++;
2168                 memset(&start, 0, sizeof(start));
2169                 do {
2170                         if ((ret = __bam_compact_int(opd, &start,
2171                              NULL, factor, &span, c_data, &isdone)) != 0)
2172                                 break;
2173                 } while (!isdone);
2174
2175                 if (start.data != NULL)
2176                         __os_free(env, start.data);
2177
2178                 if (ret != 0)
2179                         goto err;
2180
2181                 ret = __dbc_close(opd);
2182                 opd = NULL;
2183                 if (ret != 0)
2184                         goto err;
2185         }
2186
2187 err:    if (opd != NULL && (t_ret = __dbc_close(opd)) != 0 && ret == 0)
2188                 ret = t_ret;
2189         return (ret);
2190 }
2191
2192 /*
2193  * __bam_truncate_page -- swap a page with a lower numbered page.
2194  *      The cusor has a stack which includes at least the
2195  * immediate parent of this page.
2196  */
2197 static int
2198 __bam_truncate_page(dbc, pgp, opg, update_parent)
2199         DBC *dbc;
2200         PAGE **pgp, *opg;
2201         int update_parent;
2202 {
2203         BTREE_CURSOR *cp;
2204         DB *dbp;
2205         DBT data, hdr;
2206         DB_LSN lsn;
2207         DB_LOCK lock;
2208         EPG *epg;
2209         PAGE *newpage;
2210         db_pgno_t newpgno, oldpgno, *pgnop;
2211         int ret;
2212
2213         DB_ASSERT(NULL, dbc != NULL);
2214         dbp = dbc->dbp;
2215         LOCK_INIT(lock);
2216
2217         /*
2218          * We want to free a page that lives in the part of the file that
2219          * can be truncated, so we're going to move it onto a free page
2220          * that is in the part of the file that need not be truncated.
2221          * Since the freelist is ordered now, we can simply call __db_new
2222          * which will grab the first element off the freelist; we know this
2223          * is the lowest numbered free page.
2224          */
2225         if ((ret = __db_new(dbc, P_DONTEXTEND | TYPE(*pgp),
2226              TYPE(*pgp) == P_LBTREE ? &lock : NULL, &newpage)) != 0)
2227                 return (ret);
2228
2229         /*
2230          * If newpage is null then __db_new would have had to allocate
2231          * a new page from the filesystem, so there is no reason
2232          * to continue this action.
2233          */
2234         if (newpage == NULL)
2235                 return (0);
2236
2237         /*
2238          * It is possible that a higher page is allocated if other threads
2239          * are allocating at the same time, if so, just put it back.
2240          */
2241         if (PGNO(newpage) > PGNO(*pgp)) {
2242                 /* Its unfortunate but you can't just free a new overflow. */
2243                 if (TYPE(newpage) == P_OVERFLOW)
2244                         OV_LEN(newpage) = 0;
2245                 if ((ret = __LPUT(dbc, lock)) != 0)
2246                         return (ret);
2247                 return (__db_free(dbc, newpage));
2248         }
2249
2250         /* Log if necessary. */
2251         if (DBC_LOGGING(dbc)) {
2252                 memset(&hdr, 0, sizeof(hdr));
2253                 hdr.data = *pgp;
2254                 hdr.size = P_OVERHEAD(dbp);
2255                 memset(&data, 0, sizeof(data));
2256                 if (TYPE(*pgp) == P_OVERFLOW) {
2257                         data.data = (u_int8_t *)*pgp + P_OVERHEAD(dbp);
2258                         data.size = OV_LEN(*pgp);
2259                 } else {
2260                         data.data = (u_int8_t *)*pgp + HOFFSET(*pgp);
2261                         data.size = dbp->pgsize - HOFFSET(*pgp);
2262                         hdr.size += NUM_ENT(*pgp) * sizeof(db_indx_t);
2263                 }
2264                 if ((ret = __bam_merge_log(dbp, dbc->txn,
2265                       &LSN(newpage), 0, PGNO(newpage), &LSN(newpage),
2266                       PGNO(*pgp), &LSN(*pgp), &hdr, &data, 1)) != 0)
2267                         goto err;
2268         } else
2269                 LSN_NOT_LOGGED(LSN(newpage));
2270
2271         oldpgno = PGNO(*pgp);
2272         newpgno = PGNO(newpage);
2273         lsn = LSN(newpage);
2274         memcpy(newpage, *pgp, dbp->pgsize);
2275         PGNO(newpage) = newpgno;
2276         LSN(newpage) = lsn;
2277
2278         /* Empty the old page. */
2279         if ((ret = __memp_dirty(dbp->mpf,
2280             pgp, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
2281                 goto err;
2282         if (TYPE(*pgp) == P_OVERFLOW)
2283                 OV_LEN(*pgp) = 0;
2284         else {
2285                 HOFFSET(*pgp) = dbp->pgsize;
2286                 NUM_ENT(*pgp) = 0;
2287         }
2288         LSN(*pgp) = lsn;
2289
2290         /* Update siblings. */
2291         switch (TYPE(newpage)) {
2292         case P_OVERFLOW:
2293         case P_LBTREE:
2294         case P_LRECNO:
2295         case P_LDUP:
2296                 if (NEXT_PGNO(newpage) == PGNO_INVALID &&
2297                     PREV_PGNO(newpage) == PGNO_INVALID)
2298                         break;
2299                 if ((ret = __bam_relink(dbc, *pgp, opg, PGNO(newpage))) != 0)
2300                         goto err;
2301                 break;
2302         default:
2303                 break;
2304         }
2305         cp = (BTREE_CURSOR *)dbc->internal;
2306
2307         /*
2308          * Now, if we free this page, it will get truncated, when we free
2309          * all the pages after it in the file.
2310          */
2311         ret = __db_free(dbc, *pgp);
2312         /* db_free always puts the page. */
2313         *pgp = newpage;
2314
2315         if (ret != 0)
2316                 return (ret);
2317
2318         if (!update_parent)
2319                 goto done;
2320
2321         /* Update the parent. */
2322         epg = &cp->csp[-1];
2323
2324         switch (TYPE(epg->page)) {
2325         case P_IBTREE:
2326                 pgnop = &GET_BINTERNAL(dbp, epg->page, epg->indx)->pgno;
2327                 break;
2328         case P_IRECNO:
2329                 pgnop = &GET_RINTERNAL(dbp, epg->page, epg->indx)->pgno;
2330                 break;
2331         default:
2332                 pgnop = &GET_BOVERFLOW(dbp, epg->page, epg->indx)->pgno;
2333                 break;
2334         }
2335         DB_ASSERT(dbp->env, oldpgno == *pgnop);
2336         if (DBC_LOGGING(dbc)) {
2337                 if ((ret = __bam_pgno_log(dbp, dbc->txn, &LSN(epg->page),
2338                     0, PGNO(epg->page), &LSN(epg->page), (u_int32_t)epg->indx,
2339                     *pgnop, PGNO(newpage))) != 0)
2340                         return (ret);
2341         } else
2342                 LSN_NOT_LOGGED(LSN(epg->page));
2343
2344         *pgnop = PGNO(newpage);
2345         cp->csp->page = newpage;
2346         if ((ret = __TLPUT(dbc, lock)) != 0)
2347                 return (ret);
2348
2349 done:   return (0);
2350
2351 err:    (void)__memp_fput(dbp->mpf, dbc->thread_info, newpage, dbc->priority);
2352         (void)__TLPUT(dbc, lock);
2353         return (ret);
2354 }
2355
2356 /*
2357  * __bam_truncate_overflow -- find overflow pages to truncate.
2358  *      Walk the pages of an overflow chain and swap out
2359  * high numbered pages.  We are passed the first page
2360  * but only deal with the second and subsequent pages.
2361  */
2362
2363 static int
2364 __bam_truncate_overflow(dbc, pgno, ppg, c_data)
2365         DBC *dbc;
2366         db_pgno_t pgno;
2367         PAGE **ppg;
2368         DB_COMPACT *c_data;
2369 {
2370         DB *dbp;
2371         DB_LOCK lock;
2372         PAGE *page;
2373         db_pgno_t ppgno;
2374         int have_lock, ret, t_ret;
2375
2376         dbp = dbc->dbp;
2377         page = NULL;
2378         LOCK_INIT(lock);
2379         have_lock = ppg == NULL;
2380
2381         if ((ret = __memp_fget(dbp->mpf, &pgno,
2382              dbc->thread_info, dbc->txn, 0, &page)) != 0)
2383                 return (ret);
2384
2385         while ((pgno = NEXT_PGNO(page)) != PGNO_INVALID) {
2386                 if ((ret = __memp_fput(dbp->mpf,
2387                      dbc->thread_info, page, dbc->priority)) != 0)
2388                         return (ret);
2389                 if ((ret = __memp_fget(dbp->mpf, &pgno,
2390                     dbc->thread_info, dbc->txn, 0, &page)) != 0)
2391                         return (ret);
2392                 if (pgno <= c_data->compact_truncate)
2393                         continue;
2394                 if (have_lock == 0) {
2395                         ppgno = PGNO(*ppg);
2396                         if ((ret = __memp_fput(dbp->mpf, dbc->thread_info,
2397                              *ppg, dbc->priority)) != 0)
2398                                 goto err;
2399                         *ppg = NULL;
2400                         if ((ret = __db_lget(dbc, 0, ppgno,
2401                              DB_LOCK_WRITE, 0, &lock)) != 0)
2402                                 goto err;
2403                         if ((ret = __memp_fget(dbp->mpf, &ppgno,
2404                             dbc->thread_info,
2405                             dbc->txn, DB_MPOOL_DIRTY, ppg)) != 0)
2406                                 goto err;
2407                         have_lock = 1;
2408                 }
2409                 if ((ret = __bam_truncate_page(dbc, &page, NULL, 0)) != 0)
2410                         break;
2411         }
2412
2413 err:    if (page != NULL &&
2414             (t_ret = __memp_fput( dbp->mpf,
2415             dbc->thread_info, page, dbc->priority)) != 0 && ret == 0)
2416                 ret = t_ret;
2417         if ((t_ret = __TLPUT(dbc, lock)) != 0 && ret == 0)
2418                 ret = t_ret;
2419         return (ret);
2420 }
2421
2422 /*
2423  * __bam_truncate_root_page -- swap a page which is
2424  *    the root of an off page dup tree or the head of an overflow.
2425  * The page is reference by the pg/indx passed in.
2426  */
2427 static int
2428 __bam_truncate_root_page(dbc, pg, indx, c_data)
2429         DBC *dbc;
2430         PAGE *pg;
2431         u_int32_t indx;
2432         DB_COMPACT *c_data;
2433 {
2434         BINTERNAL *bi;
2435         BOVERFLOW *bo;
2436         DB *dbp;
2437         DBT orig;
2438         PAGE *page;
2439         db_pgno_t newpgno, *pgnop;
2440         int ret, t_ret;
2441
2442         COMPQUIET(c_data, NULL);
2443         COMPQUIET(bo, NULL);
2444         COMPQUIET(newpgno, PGNO_INVALID);
2445         dbp = dbc->dbp;
2446         page = NULL;
2447         if (TYPE(pg) == P_IBTREE) {
2448                 bi = GET_BINTERNAL(dbp, pg, indx);
2449                 if (B_TYPE(bi->type) == B_OVERFLOW) {
2450                         bo = (BOVERFLOW *)(bi->data);
2451                         pgnop = &bo->pgno;
2452                 } else
2453                         pgnop = &bi->pgno;
2454         } else {
2455                 bo = GET_BOVERFLOW(dbp, pg, indx);
2456                 pgnop = &bo->pgno;
2457         }
2458
2459         DB_ASSERT(dbp->env, IS_DIRTY(pg));
2460
2461         if ((ret = __memp_fget(dbp->mpf, pgnop,
2462              dbc->thread_info, dbc->txn, 0, &page)) != 0)
2463                 goto err;
2464
2465         /*
2466          * If this is a multiply reference overflow key, then we will just
2467          * copy it and decrement the reference count.  This is part of a
2468          * fix to get rid of multiple references.
2469          */
2470         if (TYPE(page) == P_OVERFLOW && OV_REF(page) > 1) {
2471                 if ((ret = __db_ovref(dbc, bo->pgno)) != 0)
2472                         goto err;
2473                 memset(&orig, 0, sizeof(orig));
2474                 if ((ret = __db_goff(dbc, &orig, bo->tlen, bo->pgno,
2475                     &orig.data, &orig.size)) == 0)
2476                         ret = __db_poff(dbc, &orig, &newpgno);
2477                 if (orig.data != NULL)
2478                         __os_free(dbp->env, orig.data);
2479                 if (ret != 0)
2480                         goto err;
2481         } else {
2482                 if ((ret = __bam_truncate_page(dbc, &page, NULL, 0)) != 0)
2483                         goto err;
2484                 newpgno = PGNO(page);
2485                 /* If we could not allocate from the free list, give up.*/
2486                 if (newpgno == *pgnop)
2487                         goto err;
2488         }
2489
2490         /* Update the reference. */
2491         if (DBC_LOGGING(dbc)) {
2492                 if ((ret = __bam_pgno_log(dbp,
2493                      dbc->txn, &LSN(pg), 0, PGNO(pg),
2494                      &LSN(pg), (u_int32_t)indx, *pgnop, newpgno)) != 0)
2495                         goto err;
2496         } else
2497                 LSN_NOT_LOGGED(LSN(pg));
2498
2499         *pgnop = newpgno;
2500
2501 err:    if (page != NULL && (t_ret =
2502               __memp_fput(dbp->mpf, dbc->thread_info,
2503                   page, dbc->priority)) != 0 && ret == 0)
2504                 ret = t_ret;
2505         return (ret);
2506 }
2507
2508 /*
2509  * -- bam_truncate_internal_overflow -- find overflow keys
2510  *      on internal pages and if they have high page
2511  * numbers swap them with lower pages and truncate them.
2512  * Note that if there are overflow keys in the internal
2513  * nodes they will get copied adding pages to the database.
2514  */
2515 static int
2516 __bam_truncate_internal_overflow(dbc, page, c_data)
2517         DBC *dbc;
2518         PAGE *page;
2519         DB_COMPACT *c_data;
2520 {
2521         BINTERNAL *bi;
2522         BOVERFLOW *bo;
2523         db_indx_t indx;
2524         int ret;
2525
2526         COMPQUIET(bo, NULL);
2527         ret = 0;
2528         for (indx = 0; indx < NUM_ENT(page); indx++) {
2529                 bi = GET_BINTERNAL(dbc->dbp, page, indx);
2530                 if (B_TYPE(bi->type) != B_OVERFLOW)
2531                         continue;
2532                 bo = (BOVERFLOW *)(bi->data);
2533                 if (bo->pgno > c_data->compact_truncate && (ret =
2534                      __bam_truncate_root_page(dbc, page, indx, c_data)) != 0)
2535                         break;
2536                 if ((ret = __bam_truncate_overflow(
2537                      dbc, bo->pgno, NULL, c_data)) != 0)
2538                         break;
2539         }
2540         return (ret);
2541 }
2542
2543 /*
2544  * __bam_compact_isdone ---
2545  *
2546  * Check to see if the stop key specified by the caller is on the
2547  * current page, in which case we are done compacting.
2548  */
2549 static int
2550 __bam_compact_isdone(dbc, stop, pg, isdone)
2551         DBC *dbc;
2552         DBT *stop;
2553         PAGE *pg;
2554         int *isdone;
2555 {
2556         db_recno_t recno;
2557         BTREE *t;
2558         BTREE_CURSOR *cp;
2559         int cmp, ret;
2560
2561         *isdone = 0;
2562         cp = (BTREE_CURSOR *)dbc->internal;
2563         t = dbc->dbp->bt_internal;
2564
2565         if (dbc->dbtype == DB_RECNO) {
2566                 if ((ret = __ram_getno(dbc, stop, &recno, 0)) != 0)
2567                         return (ret);
2568                 *isdone = cp->recno > recno;
2569         } else {
2570                 DB_ASSERT(dbc->dbp->env, TYPE(pg) == P_LBTREE);
2571                 if ((ret = __bam_cmp(dbc, stop, pg, 0,
2572                     t->bt_compare, &cmp)) != 0)
2573                         return (ret);
2574
2575                 *isdone = cmp <= 0;
2576         }
2577         return (0);
2578 }
2579
2580 /*
2581  * Lock the subtrees from the top of the stack.
2582  *      The 0'th child may be in the stack and locked otherwise iterate
2583  * through the records by calling __bam_lock_subtree.
2584  */
2585 static int
2586 __bam_lock_tree(dbc, sp, csp, start, stop)
2587         DBC *dbc;
2588         EPG *sp, *csp;
2589         u_int32_t start, stop;
2590 {
2591         PAGE *cpage;
2592         db_pgno_t pgno;
2593         int ret;
2594
2595         if (dbc->dbtype == DB_RECNO)
2596                 pgno = GET_RINTERNAL(dbc->dbp, sp->page, 0)->pgno;
2597         else
2598                 pgno = GET_BINTERNAL(dbc->dbp, sp->page, 0)->pgno;
2599         cpage = (sp + 1)->page;
2600         /*
2601          * First recurse down the left most sub tree if it is in the cursor
2602          * stack.  We already have these pages latched and locked if its a
2603          * leaf.
2604          */
2605         if (start == 0 && sp + 1 != csp && pgno == PGNO(cpage) &&
2606             (ret = __bam_lock_tree(dbc, sp + 1, csp, 0, NUM_ENT(cpage))) != 0)
2607                 return (ret);
2608
2609         /*
2610          * Then recurse on the other records on the page if needed.
2611          * If the page is in the stack then its already locked or
2612          * was processed above.
2613          */
2614         if (start == 0 && pgno == PGNO(cpage))
2615                 start = 1;
2616
2617         if (start == stop)
2618                 return (0);
2619         return (__bam_lock_subtree(dbc, sp->page, start, stop));
2620
2621 }
2622
2623 /*
2624  * Lock the subtree from the current node.
2625  */
2626 static int
2627 __bam_lock_subtree(dbc, page, indx, stop)
2628         DBC *dbc;
2629         PAGE *page;
2630         u_int32_t indx, stop;
2631 {
2632         DB *dbp;
2633         DB_LOCK lock;
2634         PAGE *cpage;
2635         db_pgno_t pgno;
2636         int ret, t_ret;
2637
2638         dbp = dbc->dbp;
2639
2640         for (; indx < stop; indx++) {
2641                 if (dbc->dbtype == DB_RECNO)
2642                         pgno = GET_RINTERNAL(dbc->dbp, page, indx)->pgno;
2643                 else
2644                         pgno = GET_BINTERNAL(dbc->dbp, page, indx)->pgno;
2645                 if (LEVEL(page) - 1 == LEAFLEVEL) {
2646                         if ((ret = __db_lget(dbc, 0, pgno,
2647                              DB_LOCK_WRITE, DB_LOCK_NOWAIT, &lock)) != 0) {
2648                                 if (ret == DB_LOCK_DEADLOCK)
2649                                         return (DB_LOCK_NOTGRANTED);
2650                                 return (ret);
2651                         }
2652                 } else {
2653                         if ((ret = __memp_fget(dbp->mpf, &pgno,
2654                              dbc->thread_info, dbc->txn, 0, &cpage)) != 0)
2655                                 return (ret);
2656                         ret = __bam_lock_subtree(dbc, cpage, 0, NUM_ENT(cpage));
2657                         if ((t_ret = __memp_fput(dbp->mpf, dbc->thread_info,
2658                             cpage, dbc->priority)) != 0 && ret == 0)
2659                                 ret = t_ret;
2660                         if (ret != 0)
2661                                 return (ret);
2662                 }
2663         }
2664         return (0);
2665 }
2666
2667 #ifdef HAVE_FTRUNCATE
2668 /*
2669  * __bam_savekey -- save the key from an internal page.
2670  *  We need to save information so that we can
2671  * fetch then next internal node of the tree.  This means
2672  * we need the btree key on this current page, or the
2673  * next record number.
2674  */
2675 static int
2676 __bam_savekey(dbc, next, start)
2677         DBC *dbc;
2678         int next;
2679         DBT *start;
2680 {
2681         BINTERNAL *bi;
2682         BKEYDATA *bk;
2683         BOVERFLOW *bo;
2684         BTREE_CURSOR *cp;
2685         DB *dbp;
2686         DB_LOCK lock;
2687         ENV *env;
2688         PAGE *pg;
2689         RINTERNAL *ri;
2690         db_indx_t indx, top;
2691         db_pgno_t pgno, saved_pgno;
2692         int ret, t_ret;
2693         u_int32_t len;
2694         u_int8_t *data;
2695         int level;
2696
2697         dbp = dbc->dbp;
2698         env = dbp->env;
2699         cp = (BTREE_CURSOR *)dbc->internal;
2700         pg = cp->csp->page;
2701         ret = 0;
2702
2703         if (dbc->dbtype == DB_RECNO) {
2704                 if (next)
2705                         for (indx = 0, top = NUM_ENT(pg); indx != top; indx++) {
2706                                 ri = GET_RINTERNAL(dbp, pg, indx);
2707                                 cp->recno += ri->nrecs;
2708                         }
2709                 return (__db_retcopy(env, start, &cp->recno,
2710                      sizeof(cp->recno), &start->data, &start->ulen));
2711
2712         }
2713         
2714         bi = GET_BINTERNAL(dbp, pg, NUM_ENT(pg) - 1);
2715         data = bi->data;
2716         len = bi->len;
2717         LOCK_INIT(lock);
2718         saved_pgno = PGNO_INVALID;
2719         /* If there is single record on the page it may have an empty key. */
2720         while (len == 0) {
2721                 /*
2722                  * We should not have an empty data page, since we just
2723                  * compacted things, check anyway and punt.
2724                  */
2725                 if (NUM_ENT(pg) == 0)
2726                         goto no_key;
2727                 pgno = bi->pgno;
2728                 level = LEVEL(pg);
2729                 if (pg != cp->csp->page &&
2730                     (ret = __memp_fput(dbp->mpf,
2731                          dbc->thread_info, pg, dbc->priority)) != 0) {
2732                         pg = NULL;
2733                         goto err;
2734                 }
2735                 if (level - 1 == LEAFLEVEL) {
2736                         TRY_LOCK(dbc, pgno, saved_pgno,
2737                             lock, DB_LOCK_READ, retry);
2738                         if (ret != 0)
2739                                 goto err;
2740                 }
2741                 if ((ret = __memp_fget(dbp->mpf, &pgno,
2742                      dbc->thread_info, dbc->txn, 0, &pg)) != 0)
2743                         goto err;
2744
2745                 /*
2746                  * At the data level use the last key to try and avoid the
2747                  * possibility that the user has a zero length key, if they
2748                  * do, we punt.
2749                  */
2750                 if (pg->level == LEAFLEVEL) {
2751                         bk = GET_BKEYDATA(dbp, pg, NUM_ENT(pg) - 2);
2752                         data = bk->data;
2753                         len = bk->len;
2754                         if (len == 0) {
2755 no_key:                         __db_errx(env,
2756                                     "Compact cannot handle zero length key");
2757                                 ret = DB_NOTFOUND;
2758                                 goto err;
2759                         }
2760                 } else {
2761                         bi = GET_BINTERNAL(dbp, pg, NUM_ENT(pg) - 1);
2762                         data = bi->data;
2763                         len = bi->len;
2764                 }
2765         }
2766         if (B_TYPE(bi->type) == B_OVERFLOW) {
2767                 bo = (BOVERFLOW *)(data);
2768                 ret = __db_goff(dbc, start, bo->tlen, bo->pgno,
2769                     &start->data, &start->ulen);
2770         }
2771         else
2772                 ret = __db_retcopy(env,
2773                      start, data, len,  &start->data, &start->ulen);
2774
2775 err:    if (pg != NULL && pg != cp->csp->page &&
2776             (t_ret = __memp_fput(dbp->mpf, dbc->thread_info,
2777                  pg, dbc->priority)) != 0 && ret == 0)
2778                 ret = t_ret;
2779         return (ret);
2780
2781 retry:  return (DB_LOCK_NOTGRANTED);
2782 }
2783
2784 /*
2785  * bam_truncate_internal --
2786  *      Find high numbered pages in the internal nodes of a tree and
2787  *      swap them.
2788  */
2789 static int
2790 __bam_truncate_internal(dbp, ip, txn, c_data)
2791         DB *dbp;
2792         DB_THREAD_INFO *ip;
2793         DB_TXN *txn;
2794         DB_COMPACT *c_data;
2795 {
2796         BTREE_CURSOR *cp;
2797         DBC *dbc;
2798         DBT start;
2799         DB_LOCK meta_lock;
2800         PAGE *pg;
2801         db_pgno_t pgno;
2802         u_int32_t sflag;
2803         int level, local_txn, ret, t_ret;
2804
2805         dbc = NULL;
2806         memset(&start, 0, sizeof(start));
2807
2808         if (IS_DB_AUTO_COMMIT(dbp, txn)) {
2809                 local_txn = 1;
2810                 txn = NULL;
2811         } else
2812                 local_txn = 0;
2813
2814         level = LEAFLEVEL + 1;
2815         sflag = CS_READ | CS_GETRECNO;
2816         LOCK_INIT(meta_lock);
2817
2818 new_txn:
2819         if (local_txn &&
2820             (ret = __txn_begin(dbp->env, ip, NULL, &txn, 0)) != 0)
2821                 goto err;
2822
2823         if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
2824                 goto err;
2825         cp = (BTREE_CURSOR *)dbc->internal;
2826
2827         /*
2828          * If the the root is a leaf we have nothing to do.
2829          * Searching an empty RECNO tree will return NOTFOUND below and loop.
2830          */
2831         if ((ret = __memp_fget(dbp->mpf, &cp->root, ip, txn, 0, &pg)) != 0)
2832                 goto err;
2833         if (LEVEL(pg) == LEAFLEVEL) {
2834                 ret = __memp_fput(dbp->mpf, ip, pg, dbp->priority);
2835                 goto err;
2836         }
2837         if ((ret = __memp_fput(dbp->mpf, ip, pg, dbp->priority)) != 0)
2838                 goto err;
2839
2840         pgno = PGNO_INVALID;
2841         do {
2842                 if ((ret = __bam_csearch(dbc, &start, sflag, level)) != 0) {
2843                         /* No more at this level, go up one. */
2844                         if (ret == DB_NOTFOUND) {
2845                                 level++;
2846                                 if (start.data != NULL)
2847                                         __os_free(dbp->env, start.data);
2848                                 memset(&start, 0, sizeof(start));
2849                                 sflag = CS_READ | CS_GETRECNO;
2850                                 continue;
2851                         }
2852                         goto err;
2853                 }
2854                 c_data->compact_pages_examine++;
2855
2856                 pg = cp->csp->page;
2857                 pgno = PGNO(pg);
2858
2859                 sflag = CS_NEXT | CS_GETRECNO;
2860                 /* Grab info about the page and drop the stack. */
2861                 if (pgno != cp->root && (ret = __bam_savekey(dbc,
2862                     pgno <= c_data->compact_truncate, &start)) != 0) {
2863                         if (ret == DB_LOCK_NOTGRANTED)
2864                                 continue;
2865                         goto err;
2866                 }
2867
2868                 if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0)
2869                         goto err;
2870                 if (pgno == cp->root)
2871                         break;
2872
2873                 if (pgno <= c_data->compact_truncate)
2874                         continue;
2875
2876                 /* Get the meta page lock before latching interior nodes. */
2877                 if (!LOCK_ISSET(meta_lock) && (ret = __db_lget(dbc,
2878                      0, PGNO_BASE_MD, DB_LOCK_WRITE, 0, &meta_lock)) != 0)
2879                         goto err;
2880
2881                 /* Reget the page with a write latch, and its parent too. */
2882                 if ((ret = __bam_csearch(dbc,
2883                     &start, CS_PARENT | CS_GETRECNO, level)) != 0) {
2884                         if (ret == DB_NOTFOUND) {
2885                                 ret = 0;
2886                         }
2887                         goto err;
2888                 }
2889                 pg = cp->csp->page;
2890                 pgno = PGNO(pg);
2891
2892                 if (pgno > c_data->compact_truncate) {
2893                         if ((ret = __bam_truncate_page(dbc, &pg, NULL, 1)) != 0)
2894                                 goto err;
2895                         if (pgno == PGNO(pg)) {
2896                                 /* We could not allocate.  Give up. */
2897                                 pgno = cp->root;
2898                         }
2899                 }
2900
2901                 if ((ret = __bam_stkrel(dbc,
2902                      pgno > c_data->compact_truncate ? 0 : STK_NOLOCK)) != 0)
2903                         goto err;
2904
2905                 /* We are locking subtrees, so drop the write locks asap. */
2906                 if (local_txn && pgno > c_data->compact_truncate)
2907                         break;
2908         } while (pgno != cp->root);
2909
2910         if ((ret = __dbc_close(dbc)) != 0)
2911                 goto err;
2912         dbc = NULL;
2913         if (local_txn) {
2914                 if ((ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0)
2915                         goto err;
2916                 txn = NULL;
2917                 LOCK_INIT(meta_lock);
2918         }
2919         if (pgno != ((BTREE *)dbp->bt_internal)->bt_root)
2920                 goto new_txn;
2921
2922 err:    if (txn != NULL && ret != 0)
2923                 sflag = STK_PGONLY;
2924         else
2925                 sflag = 0;
2926         if (txn == NULL)
2927                 if ((t_ret = __LPUT(dbc, meta_lock)) != 0 && ret == 0)
2928                         ret = t_ret;
2929         if (dbc != NULL && (t_ret = __bam_stkrel(dbc, sflag)) != 0 && ret == 0)
2930                 ret = t_ret;
2931         if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
2932                 ret = t_ret;
2933         if (local_txn &&
2934             txn != NULL && (t_ret = __txn_abort(txn)) != 0 && ret == 0)
2935                 ret = t_ret;
2936         if (start.data != NULL)
2937                 __os_free(dbp->env, start.data);
2938         return (ret);
2939 }
2940
2941 static int
2942 __bam_setup_freelist(dbp, list, nelems)
2943         DB *dbp;
2944         db_pglist_t *list;
2945         u_int32_t nelems;
2946 {
2947         DB_MPOOLFILE *mpf;
2948         db_pgno_t *plist;
2949         int ret;
2950
2951         mpf = dbp->mpf;
2952
2953         if ((ret = __memp_alloc_freelist(mpf, nelems, &plist)) != 0)
2954                 return (ret);
2955
2956         while (nelems-- != 0)
2957                 *plist++ = list++->pgno;
2958
2959         return (0);
2960 }
2961
2962 static int
2963 __bam_free_freelist(dbp, ip, txn)
2964         DB *dbp;
2965         DB_THREAD_INFO *ip;
2966         DB_TXN *txn;
2967 {
2968         DBC *dbc;
2969         DB_LOCK lock;
2970         int auto_commit, ret, t_ret;
2971
2972         LOCK_INIT(lock);
2973         auto_commit = ret = 0;
2974
2975         /*
2976          * If we are not in a transaction then we need to get
2977          * a lock on the meta page, otherwise we should already
2978          * have the lock.
2979          */
2980
2981         dbc = NULL;
2982         if (IS_DB_AUTO_COMMIT(dbp, txn)) {
2983                 /*
2984                  * We must not timeout the lock or we will not free the list.
2985                  * We ignore errors from txn_begin as there is little that
2986                  * the application can do with the error and we want to
2987                  * get the lock and free the list if at all possible.
2988                  */
2989                 if (__txn_begin(dbp->env, ip, NULL, &txn, 0) == 0) {
2990                         (void)__lock_set_timeout(dbp->env,
2991                             txn->locker, 0, DB_SET_TXN_TIMEOUT);
2992                         (void)__lock_set_timeout(dbp->env,
2993                             txn->locker, 0, DB_SET_LOCK_TIMEOUT);
2994                         auto_commit = 1;
2995                 }
2996                 /* Get a cursor so we can call __db_lget. */
2997                 if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
2998                         return (ret);
2999
3000                 if ((ret = __db_lget(dbc,
3001                      0, PGNO_BASE_MD, DB_LOCK_WRITE, 0, &lock)) != 0)
3002                         goto err;
3003         }
3004
3005         ret = __memp_free_freelist(dbp->mpf);
3006
3007 err:    if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
3008                 ret = t_ret;
3009
3010         if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
3011                 ret = t_ret;
3012
3013         if (auto_commit && __txn_abort(txn) != 0 && ret == 0)
3014                 ret = t_ret;
3015
3016         return (ret);
3017 }
3018 #endif