Remove definition of builtin function
[platform/upstream/db4.git] / btree / bt_split.c
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996-2009 Oracle.  All rights reserved.
5  */
6 /*
7  * Copyright (c) 1990, 1993, 1994, 1995, 1996
8  *      Keith Bostic.  All rights reserved.
9  */
10 /*
11  * Copyright (c) 1990, 1993, 1994, 1995
12  *      The Regents of the University of California.  All rights reserved.
13  *
14  * Redistribution and use in source and binary forms, with or without
15  * modification, are permitted provided that the following conditions
16  * are met:
17  * 1. Redistributions of source code must retain the above copyright
18  *    notice, this list of conditions and the following disclaimer.
19  * 2. Redistributions in binary form must reproduce the above copyright
20  *    notice, this list of conditions and the following disclaimer in the
21  *    documentation and/or other materials provided with the distribution.
22  * 3. Neither the name of the University nor the names of its contributors
23  *    may be used to endorse or promote products derived from this software
24  *    without specific prior written permission.
25  *
26  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
27  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
29  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
30  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
31  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
32  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
33  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
34  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
35  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
36  * SUCH DAMAGE.
37  *
38  * $Id$
39  */
40
41 #include "db_config.h"
42
43 #include "db_int.h"
44 #include "dbinc/db_page.h"
45 #include "dbinc/lock.h"
46 #include "dbinc/mp.h"
47 #include "dbinc/btree.h"
48
49 static int __bam_page __P((DBC *, EPG *, EPG *));
50 static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
51 static int __bam_root __P((DBC *, EPG *));
52
53 /*
54  * __bam_split --
55  *      Split a page.
56  *
57  * PUBLIC: int __bam_split __P((DBC *, void *, db_pgno_t *));
58  */
59 int
60 __bam_split(dbc, arg, root_pgnop)
61         DBC *dbc;
62         void *arg;
63         db_pgno_t *root_pgnop;
64 {
65         BTREE_CURSOR *cp;
66         DB_LOCK metalock, next_lock;
67         enum { UP, DOWN } dir;
68         db_pgno_t pgno, next_pgno, root_pgno;
69         int exact, level, ret;
70
71         cp = (BTREE_CURSOR *)dbc->internal;
72         root_pgno = cp->root;
73         LOCK_INIT(next_lock);
74         next_pgno = PGNO_INVALID;
75
76         /*
77          * First get a lock on the metadata page, we will have to allocate
78          * pages and cannot get a lock while we have the search tree pinnned.
79          */
80
81         pgno = PGNO_BASE_MD;
82         if ((ret = __db_lget(dbc,
83             0, pgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
84                 goto err;
85
86         /*
87          * The locking protocol we use to avoid deadlock to acquire locks by
88          * walking down the tree, but we do it as lazily as possible, locking
89          * the root only as a last resort.  We expect all stack pages to have
90          * been discarded before we're called; we discard all short-term locks.
91          *
92          * When __bam_split is first called, we know that a leaf page was too
93          * full for an insert.  We don't know what leaf page it was, but we
94          * have the key/recno that caused the problem.  We call XX_search to
95          * reacquire the leaf page, but this time get both the leaf page and
96          * its parent, locked.  We then split the leaf page and see if the new
97          * internal key will fit into the parent page.  If it will, we're done.
98          *
99          * If it won't, we discard our current locks and repeat the process,
100          * only this time acquiring the parent page and its parent, locked.
101          * This process repeats until we succeed in the split, splitting the
102          * root page as the final resort.  The entire process then repeats,
103          * as necessary, until we split a leaf page.
104          *
105          * XXX
106          * A traditional method of speeding this up is to maintain a stack of
107          * the pages traversed in the original search.  You can detect if the
108          * stack is correct by storing the page's LSN when it was searched and
109          * comparing that LSN with the current one when it's locked during the
110          * split.  This would be an easy change for this code, but I have no
111          * numbers that indicate it's worthwhile.
112          */
113         for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
114                 /*
115                  * Acquire a page and its parent, locked.
116                  */
117 retry:          if ((ret = (dbc->dbtype == DB_BTREE ?
118                     __bam_search(dbc, PGNO_INVALID,
119                         arg, SR_WRPAIR, level, NULL, &exact) :
120                     __bam_rsearch(dbc,
121                         (db_recno_t *)arg, SR_WRPAIR, level, &exact))) != 0)
122                         break;
123
124                 if (cp->csp[0].page->pgno == root_pgno) {
125                         /* we can overshoot the top of the tree. */
126                         level = cp->csp[0].page->level;
127                         if (root_pgnop != NULL)
128                                 *root_pgnop = root_pgno;
129                 } else if (root_pgnop != NULL)
130                         *root_pgnop = cp->csp[-1].page->pgno;
131
132                 /*
133                  * Split the page if it still needs it (it's possible another
134                  * thread of control has already split the page).  If we are
135                  * guaranteed that two items will fit on the page, the split
136                  * is no longer necessary.
137                  */
138                 if (2 * B_MAXSIZEONPAGE(cp->ovflsize)
139                     <= (db_indx_t)P_FREESPACE(dbc->dbp, cp->csp[0].page)) {
140                         if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0)
141                                 goto err;
142                         goto no_split;
143                 }
144
145                 /*
146                  * We need to try to lock the next page so we can update
147                  * its PREV.
148                  */
149                 if (dbc->dbtype == DB_BTREE && ISLEAF(cp->csp->page) &&
150                     (pgno = NEXT_PGNO(cp->csp->page)) != PGNO_INVALID) {
151                         TRY_LOCK(dbc, pgno,
152                              next_pgno, next_lock, DB_LOCK_WRITE, retry);
153                         if (ret != 0)
154                                 goto err;
155                 }
156                 ret = cp->csp[0].page->pgno == root_pgno ?
157                     __bam_root(dbc, &cp->csp[0]) :
158                     __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
159                 BT_STK_CLR(cp);
160
161                 switch (ret) {
162                 case 0:
163 no_split:               /* Once we've split the leaf page, we're done. */
164                         if (level == LEAFLEVEL)
165                                 goto done;
166
167                         /* Switch directions. */
168                         if (dir == UP)
169                                 dir = DOWN;
170                         break;
171                 case DB_NEEDSPLIT:
172                         /*
173                          * It's possible to fail to split repeatedly, as other
174                          * threads may be modifying the tree, or the page usage
175                          * is sufficiently bad that we don't get enough space
176                          * the first time.
177                          */
178                         if (dir == DOWN)
179                                 dir = UP;
180                         break;
181                 default:
182                         goto err;
183                 }
184         }
185
186 err:    if (root_pgnop != NULL)
187                 *root_pgnop = cp->root;
188 done:   (void)__LPUT(dbc, metalock);
189         (void)__TLPUT(dbc, next_lock);
190         return (ret);
191 }
192
193 /*
194  * __bam_root --
195  *      Split the root page of a btree.
196  */
197 static int
198 __bam_root(dbc, cp)
199         DBC *dbc;
200         EPG *cp;
201 {
202         DB *dbp;
203         DBT log_dbt, rootent[2];
204         DB_LOCK llock, rlock;
205         DB_LSN log_lsn;
206         DB_MPOOLFILE *mpf;
207         PAGE *lp, *rp;
208         db_indx_t split;
209         u_int32_t opflags;
210         int ret, t_ret;
211
212         dbp = dbc->dbp;
213         mpf = dbp->mpf;
214         lp = rp = NULL;
215         LOCK_INIT(llock);
216         LOCK_INIT(rlock);
217         COMPQUIET(log_dbt.data, NULL);
218
219         /* Yeah, right. */
220         if (cp->page->level >= MAXBTREELEVEL) {
221                 __db_errx(dbp->env,
222                     "Too many btree levels: %d", cp->page->level);
223                 return (ENOSPC);
224         }
225
226         if ((ret = __memp_dirty(mpf,
227             &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
228                 goto err;
229
230         /* Create new left and right pages for the split. */
231         if ((ret = __db_new(dbc, TYPE(cp->page), &llock, &lp)) != 0 ||
232             (ret = __db_new(dbc, TYPE(cp->page), &rlock, &rp)) != 0)
233                 goto err;
234         P_INIT(lp, dbp->pgsize, lp->pgno,
235             PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
236             cp->page->level, TYPE(cp->page));
237         P_INIT(rp, dbp->pgsize, rp->pgno,
238             ISINTERNAL(cp->page) ?  PGNO_INVALID : lp->pgno, PGNO_INVALID,
239             cp->page->level, TYPE(cp->page));
240
241         /* Split the page. */
242         if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
243                 goto err;
244
245         if (DBC_LOGGING(dbc)) {
246                 memset(&log_dbt, 0, sizeof(log_dbt));
247                 if ((ret =
248                     __os_malloc(dbp->env, dbp->pgsize, &log_dbt.data)) != 0)
249                         goto err;
250                 log_dbt.size = dbp->pgsize;
251                 memcpy(log_dbt.data, cp->page, dbp->pgsize);
252         }
253
254         /* Clean up the new root page. */
255         if ((ret = (dbc->dbtype == DB_RECNO ?
256             __ram_root(dbc, cp->page, lp, rp) :
257             __bam_broot(dbc, cp->page, split, lp, rp))) != 0) {
258                 if (DBC_LOGGING(dbc))
259                         __os_free(dbp->env, log_dbt.data);
260                 goto err;
261         }
262
263         /* Log the change. */
264         if (DBC_LOGGING(dbc)) {
265                 memset(rootent, 0, sizeof(rootent));
266                 rootent[0].data = GET_BINTERNAL(dbp, cp->page, 0);
267                 rootent[1].data = GET_BINTERNAL(dbp, cp->page, 1);
268                 if (dbc->dbtype == DB_RECNO)
269                         rootent[0].size = rootent[1].size = RINTERNAL_SIZE;
270                 else {
271                         rootent[0].size = BINTERNAL_SIZE(
272                             ((BINTERNAL *)rootent[0].data)->len);
273                         rootent[1].size = BINTERNAL_SIZE(
274                             ((BINTERNAL *)rootent[1].data)->len);
275                 }
276                 ZERO_LSN(log_lsn);
277                 opflags = F_ISSET(
278                     (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0;
279                 if (dbc->dbtype == DB_RECNO)
280                         opflags |= SPL_RECNO;
281                 ret = __bam_split_log(dbp,
282                     dbc->txn, &LSN(cp->page), 0, PGNO(lp), &LSN(lp), PGNO(rp),
283                     &LSN(rp), (u_int32_t)NUM_ENT(lp), PGNO_INVALID, &log_lsn,
284                     dbc->internal->root, &LSN(cp->page), 0,
285                     &log_dbt, &rootent[0], &rootent[1], opflags);
286
287                 __os_free(dbp->env, log_dbt.data);
288
289                 if (ret != 0)
290                         goto err;
291         } else
292                 LSN_NOT_LOGGED(LSN(cp->page));
293         LSN(lp) = LSN(cp->page);
294         LSN(rp) = LSN(cp->page);
295
296         /* Adjust any cursors. */
297         ret = __bam_ca_split(dbc, cp->page->pgno, lp->pgno, rp->pgno, split, 1);
298
299         /* Success or error: release pages and locks. */
300 err:    if (cp->page != NULL && (t_ret = __memp_fput(mpf,
301              dbc->thread_info, cp->page, dbc->priority)) != 0 && ret == 0)
302                 ret = t_ret;
303         cp->page = NULL;
304
305         /*
306          * We are done.  Put or downgrade all our locks and release
307          * the pages.
308          */
309         if ((t_ret = __TLPUT(dbc, llock)) != 0 && ret == 0)
310                 ret = t_ret;
311         if ((t_ret = __TLPUT(dbc, rlock)) != 0 && ret == 0)
312                 ret = t_ret;
313         if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
314                 ret = t_ret;
315         if (lp != NULL && (t_ret = __memp_fput(mpf,
316              dbc->thread_info, lp, dbc->priority)) != 0 && ret == 0)
317                 ret = t_ret;
318         if (rp != NULL && (t_ret = __memp_fput(mpf,
319              dbc->thread_info, rp, dbc->priority)) != 0 && ret == 0)
320                 ret = t_ret;
321
322         return (ret);
323 }
324
325 /*
326  * __bam_page --
327  *      Split the non-root page of a btree.
328  */
329 static int
330 __bam_page(dbc, pp, cp)
331         DBC *dbc;
332         EPG *pp, *cp;
333 {
334         BTREE_CURSOR *bc;
335         DB *dbp;
336         DBT log_dbt, rentry;
337         DB_LOCK rplock;
338         DB_LSN log_lsn;
339         DB_LSN save_lsn;
340         DB_MPOOLFILE *mpf;
341         PAGE *lp, *rp, *alloc_rp, *tp;
342         db_indx_t split;
343         u_int32_t opflags;
344         int ret, t_ret;
345
346         dbp = dbc->dbp;
347         mpf = dbp->mpf;
348         alloc_rp = lp = rp = tp = NULL;
349         LOCK_INIT(rplock);
350         ret = -1;
351
352         /*
353          * Create new left page for the split, and fill in everything
354          * except its LSN and next-page page number.
355          *
356          * Create a new right page for the split, and fill in everything
357          * except its LSN and page number.
358          *
359          * We malloc space for both the left and right pages, so we don't get
360          * a new page from the underlying buffer pool until we know the split
361          * is going to succeed.  The reason is that we can't release locks
362          * acquired during the get-a-new-page process because metadata page
363          * locks can't be discarded on failure since we may have modified the
364          * free list.  So, if you assume that we're holding a write lock on the
365          * leaf page which ran out of space and started this split (e.g., we
366          * have already written records to the page, or we retrieved a record
367          * from it with the DB_RMW flag set), failing in a split with both a
368          * leaf page locked and the metadata page locked can potentially lock
369          * up the tree badly, because we've violated the rule of always locking
370          * down the tree, and never up.
371          */
372         if ((ret = __os_malloc(dbp->env, dbp->pgsize * 2, &lp)) != 0)
373                 goto err;
374         P_INIT(lp, dbp->pgsize, PGNO(cp->page),
375             ISINTERNAL(cp->page) ?  PGNO_INVALID : PREV_PGNO(cp->page),
376             ISINTERNAL(cp->page) ?  PGNO_INVALID : 0,
377             cp->page->level, TYPE(cp->page));
378
379         rp = (PAGE *)((u_int8_t *)lp + dbp->pgsize);
380         P_INIT(rp, dbp->pgsize, 0,
381             ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page),
382             ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page),
383             cp->page->level, TYPE(cp->page));
384
385         /*
386          * Split right.
387          *
388          * Only the indices are sorted on the page, i.e., the key/data pairs
389          * aren't, so it's simpler to copy the data from the split page onto
390          * two new pages instead of copying half the data to a new right page
391          * and compacting the left page in place.  Since the left page can't
392          * change, we swap the original and the allocated left page after the
393          * split.
394          */
395         if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
396                 goto err;
397
398         /*
399          * Test to see if we are going to be able to insert the new pages into
400          * the parent page.  The interesting failure here is that the parent
401          * page can't hold the new keys, and has to be split in turn, in which
402          * case we want to release all the locks we can.
403          */
404         if ((ret = __bam_pinsert(dbc, pp, split, lp, rp, BPI_SPACEONLY)) != 0)
405                 goto err;
406
407         /*
408          * We've got everything locked down we need, and we know the split
409          * is going to succeed.  Go and get the additional page we'll need.
410          */
411         if ((ret = __db_new(dbc, TYPE(cp->page), &rplock, &alloc_rp)) != 0)
412                 goto err;
413
414         /*
415          * Prepare to fix up the previous pointer of any leaf page following
416          * the split page.  Our caller has already write locked the page so
417          * we can get it without deadlocking on the parent latch.
418          */
419         if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID &&
420             (ret = __memp_fget(mpf, &NEXT_PGNO(cp->page),
421             dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &tp)) != 0)
422                 goto err;
423
424         /*
425          * Fix up the page numbers we didn't have before.  We have to do this
426          * before calling __bam_pinsert because it may copy a page number onto
427          * the parent page and it takes the page number from its page argument.
428          */
429         PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp);
430
431         DB_ASSERT(dbp->env, IS_DIRTY(cp->page));
432         DB_ASSERT(dbp->env, IS_DIRTY(pp->page));
433
434         /* Actually update the parent page. */
435         if ((ret = __bam_pinsert(dbc, pp, split, lp, rp, BPI_NOLOGGING)) != 0)
436                 goto err;
437
438         bc = (BTREE_CURSOR *)dbc->internal;
439         /* Log the change. */
440         if (DBC_LOGGING(dbc)) {
441                 memset(&log_dbt, 0, sizeof(log_dbt));
442                 log_dbt.data = cp->page;
443                 log_dbt.size = dbp->pgsize;
444                 memset(&rentry, 0, sizeof(rentry));
445                 rentry.data = GET_BINTERNAL(dbp, pp->page, pp->indx + 1);
446                 opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0;
447                 if (dbc->dbtype == DB_RECNO) {
448                         opflags |= SPL_RECNO;
449                         rentry.size = RINTERNAL_SIZE;
450                 } else
451                         rentry.size =
452                             BINTERNAL_SIZE(((BINTERNAL *)rentry.data)->len);
453                 if (tp == NULL)
454                         ZERO_LSN(log_lsn);
455                 if ((ret = __bam_split_log(dbp, dbc->txn, &LSN(cp->page), 0,
456                     PGNO(cp->page), &LSN(cp->page), PGNO(alloc_rp),
457                     &LSN(alloc_rp), (u_int32_t)NUM_ENT(lp),
458                     tp == NULL ? 0 : PGNO(tp), tp == NULL ? &log_lsn : &LSN(tp),
459                     PGNO(pp->page), &LSN(pp->page), pp->indx,
460                     &log_dbt, NULL, &rentry, opflags)) != 0) {
461                         /*
462                          * Undo the update to the parent page, which has not
463                          * been logged yet. This must succeed.
464                          */
465                         t_ret = __db_ditem_nolog(dbc, pp->page,
466                             pp->indx + 1, rentry.size);
467                         DB_ASSERT(dbp->env, t_ret == 0);
468
469                         goto err;
470                 }
471
472         } else
473                 LSN_NOT_LOGGED(LSN(cp->page));
474
475         /* Update the LSNs for all involved pages. */
476         LSN(alloc_rp) = LSN(cp->page);
477         LSN(lp) = LSN(cp->page);
478         LSN(rp) = LSN(cp->page);
479         LSN(pp->page) = LSN(cp->page);
480         if (tp != NULL) {
481                 /* Log record has been written; now it is safe to update next page. */
482                 PREV_PGNO(tp) = PGNO(rp);
483                 LSN(tp) = LSN(cp->page);
484         }
485
486         /*
487          * Copy the left and right pages into place.  There are two paths
488          * through here.  Either we are logging and we set the LSNs in the
489          * logging path.  However, if we are not logging, then we do not
490          * have valid LSNs on lp or rp.  The correct LSNs to use are the
491          * ones on the page we got from __db_new or the one that was
492          * originally on cp->page.  In both cases, we save the LSN from the
493          * real database page (not a malloc'd one) and reapply it after we
494          * do the copy.
495          */
496         save_lsn = alloc_rp->lsn;
497         memcpy(alloc_rp, rp, LOFFSET(dbp, rp));
498         memcpy((u_int8_t *)alloc_rp + HOFFSET(rp),
499             (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp));
500         alloc_rp->lsn = save_lsn;
501
502         save_lsn = cp->page->lsn;
503         memcpy(cp->page, lp, LOFFSET(dbp, lp));
504         memcpy((u_int8_t *)cp->page + HOFFSET(lp),
505             (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
506         cp->page->lsn = save_lsn;
507
508         /* Adjust any cursors. */
509         if ((ret = __bam_ca_split(dbc,
510             PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0)) != 0)
511                 goto err;
512
513         __os_free(dbp->env, lp);
514
515         /*
516          * Success -- write the real pages back to the store.
517          */
518         if ((t_ret = __memp_fput(mpf,
519             dbc->thread_info, alloc_rp, dbc->priority)) != 0 && ret == 0)
520                 ret = t_ret;
521         if ((t_ret = __TLPUT(dbc, rplock)) != 0 && ret == 0)
522                 ret = t_ret;
523         if (tp != NULL) {
524                 if ((t_ret = __memp_fput(mpf,
525                     dbc->thread_info, tp, dbc->priority)) != 0 && ret == 0)
526                         ret = t_ret;
527         }
528         if ((t_ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0 && ret == 0)
529                 ret = t_ret;
530         return (ret);
531
532 err:    if (lp != NULL)
533                 __os_free(dbp->env, lp);
534         if (alloc_rp != NULL)
535                 (void)__memp_fput(mpf,
536                     dbc->thread_info, alloc_rp, dbc->priority);
537         if (tp != NULL)
538                 (void)__memp_fput(mpf, dbc->thread_info, tp, dbc->priority);
539
540         if (pp->page != NULL)
541                 (void)__memp_fput(mpf,
542                      dbc->thread_info, pp->page, dbc->priority);
543
544         if (ret == DB_NEEDSPLIT)
545                 (void)__LPUT(dbc, pp->lock);
546         else
547                 (void)__TLPUT(dbc, pp->lock);
548
549         (void)__memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority);
550
551         /*
552          * We don't drop the left and right page locks.  If we doing dirty
553          * reads then we need to hold the locks until we abort the transaction.
554          * If we are not transactional, we are hosed anyway as the tree
555          * is trashed.  It may be better not to leak the locks.
556          */
557
558         if (dbc->txn == NULL)
559                 (void)__LPUT(dbc, rplock);
560
561         if (dbc->txn == NULL || ret == DB_NEEDSPLIT)
562                 (void)__LPUT(dbc, cp->lock);
563
564         return (ret);
565 }
566
567 /*
568  * __bam_broot --
569  *      Fix up the btree root page after it has been split.
570  * PUBLIC: int __bam_broot __P((DBC *, PAGE *, u_int32_t, PAGE *, PAGE *));
571  */
572 int
573 __bam_broot(dbc, rootp, split, lp, rp)
574         DBC *dbc;
575         u_int32_t split;
576         PAGE *rootp, *lp, *rp;
577 {
578         BINTERNAL bi, bi0, *child_bi;
579         BKEYDATA *child_bk;
580         BOVERFLOW bo, *child_bo;
581         BTREE_CURSOR *cp;
582         DB *dbp;
583         DBT hdr, hdr0, data;
584         db_pgno_t root_pgno;
585         int ret;
586
587         dbp = dbc->dbp;
588         cp = (BTREE_CURSOR *)dbc->internal;
589         child_bo = NULL;
590         data.data = NULL;
591         memset(&bi, 0, sizeof(bi));
592
593         switch (TYPE(rootp)) {
594         case P_IBTREE:
595                 /* Copy the first key of the child page onto the root page. */
596                 child_bi = GET_BINTERNAL(dbp, rootp, split);
597                 switch (B_TYPE(child_bi->type)) {
598                 case B_KEYDATA:
599                         bi.len = child_bi->len;
600                         B_TSET(bi.type, B_KEYDATA);
601                         bi.pgno = rp->pgno;
602                         DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
603                         if ((ret = __os_malloc(dbp->env,
604                             child_bi->len, &data.data)) != 0)
605                                 return (ret);
606                         memcpy(data.data, child_bi->data, child_bi->len);
607                         data.size = child_bi->len;
608                         break;
609                 case B_OVERFLOW:
610                         /* Reuse the overflow key. */
611                         child_bo = (BOVERFLOW *)child_bi->data;
612                         memset(&bo, 0, sizeof(bo));
613                         bo.type = B_OVERFLOW;
614                         bo.tlen = child_bo->tlen;
615                         bo.pgno = child_bo->pgno;
616                         bi.len = BOVERFLOW_SIZE;
617                         B_TSET(bi.type, B_OVERFLOW);
618                         bi.pgno = rp->pgno;
619                         DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
620                         DB_SET_DBT(data, &bo, BOVERFLOW_SIZE);
621                         break;
622                 case B_DUPLICATE:
623                 default:
624                         goto pgfmt;
625                 }
626                 break;
627         case P_LDUP:
628         case P_LBTREE:
629                 /* Copy the first key of the child page onto the root page. */
630                 child_bk = GET_BKEYDATA(dbp, rootp, split);
631                 switch (B_TYPE(child_bk->type)) {
632                 case B_KEYDATA:
633                         bi.len = child_bk->len;
634                         B_TSET(bi.type, B_KEYDATA);
635                         bi.pgno = rp->pgno;
636                         DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
637                         if ((ret = __os_malloc(dbp->env,
638                              child_bk->len, &data.data)) != 0)
639                                 return (ret);
640                         memcpy(data.data, child_bk->data, child_bk->len);
641                         data.size = child_bk->len;
642                         break;
643                 case B_OVERFLOW:
644                         /* Copy the overflow key. */
645                         child_bo = (BOVERFLOW *)child_bk;
646                         memset(&bo, 0, sizeof(bo));
647                         bo.type = B_OVERFLOW;
648                         bo.tlen = child_bo->tlen;
649                         memset(&hdr, 0, sizeof(hdr));
650                         if ((ret = __db_goff(dbc, &hdr, child_bo->tlen,
651                              child_bo->pgno, &hdr.data, &hdr.size)) == 0)
652                                 ret = __db_poff(dbc, &hdr, &bo.pgno);
653
654                         if (hdr.data != NULL)
655                                 __os_free(dbp->env, hdr.data);
656                         if (ret != 0)
657                                 return (ret);
658
659                         bi.len = BOVERFLOW_SIZE;
660                         B_TSET(bi.type, B_OVERFLOW);
661                         bi.pgno = rp->pgno;
662                         DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
663                         DB_SET_DBT(data, &bo, BOVERFLOW_SIZE);
664                         break;
665                 case B_DUPLICATE:
666                 default:
667                         goto pgfmt;
668                 }
669                 break;
670         default:
671 pgfmt:          return (__db_pgfmt(dbp->env, rp->pgno));
672         }
673         /*
674          * If the root page was a leaf page, change it into an internal page.
675          * We copy the key we split on (but not the key's data, in the case of
676          * a leaf page) to the new root page.
677          */
678         root_pgno = cp->root;
679         P_INIT(rootp, dbp->pgsize,
680             root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
681
682         /*
683          * The btree comparison code guarantees that the left-most key on any
684          * internal btree page is never used, so it doesn't need to be filled
685          * in.  Set the record count if necessary.
686          */
687         memset(&bi0, 0, sizeof(bi0));
688         B_TSET(bi0.type, B_KEYDATA);
689         bi0.pgno = lp->pgno;
690         if (F_ISSET(cp, C_RECNUM)) {
691                 bi0.nrecs = __bam_total(dbp, lp);
692                 RE_NREC_SET(rootp, bi0.nrecs);
693                 bi.nrecs = __bam_total(dbp, rp);
694                 RE_NREC_ADJ(rootp, bi.nrecs);
695         }
696         DB_SET_DBT(hdr0, &bi0, SSZA(BINTERNAL, data));
697         if ((ret = __db_pitem_nolog(dbc, rootp,
698             0, BINTERNAL_SIZE(0), &hdr0, NULL)) != 0)
699                 goto err;
700         ret = __db_pitem_nolog(dbc, rootp, 1,
701             BINTERNAL_SIZE(data.size), &hdr, &data);
702
703 err:    if (data.data != NULL && child_bo == NULL)
704                 __os_free(dbp->env, data.data);
705         return (ret);
706 }
707
708 /*
709  * __ram_root --
710  *      Fix up the recno root page after it has been split.
711  * PUBLIC:  int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
712  */
713 int
714 __ram_root(dbc, rootp, lp, rp)
715         DBC *dbc;
716         PAGE *rootp, *lp, *rp;
717 {
718         DB *dbp;
719         DBT hdr;
720         RINTERNAL ri;
721         db_pgno_t root_pgno;
722         int ret;
723
724         dbp = dbc->dbp;
725         root_pgno = dbc->internal->root;
726
727         /* Initialize the page. */
728         P_INIT(rootp, dbp->pgsize,
729             root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
730
731         /* Initialize the header. */
732         DB_SET_DBT(hdr, &ri, RINTERNAL_SIZE);
733
734         /* Insert the left and right keys, set the header information. */
735         ri.pgno = lp->pgno;
736         ri.nrecs = __bam_total(dbp, lp);
737         if ((ret = __db_pitem_nolog(dbc,
738              rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
739                 return (ret);
740         RE_NREC_SET(rootp, ri.nrecs);
741         ri.pgno = rp->pgno;
742         ri.nrecs = __bam_total(dbp, rp);
743         if ((ret = __db_pitem_nolog(dbc,
744             rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
745                 return (ret);
746         RE_NREC_ADJ(rootp, ri.nrecs);
747         return (0);
748 }
749
750 /*
751  * __bam_pinsert --
752  *      Insert a new key into a parent page, completing the split.
753  *
754  * PUBLIC: int __bam_pinsert
755  * PUBLIC:     __P((DBC *, EPG *, u_int32_t, PAGE *, PAGE *, int));
756  */
757 int
758 __bam_pinsert(dbc, parent, split, lchild, rchild, flags)
759         DBC *dbc;
760         EPG *parent;
761         u_int32_t split;
762         PAGE *lchild, *rchild;
763         int flags;
764 {
765         BINTERNAL bi, *child_bi;
766         BKEYDATA *child_bk, *tmp_bk;
767         BOVERFLOW bo, *child_bo;
768         BTREE *t;
769         BTREE_CURSOR *cp;
770         DB *dbp;
771         DBT a, b, hdr, data;
772         EPG *child;
773         PAGE *ppage;
774         RINTERNAL ri;
775         db_indx_t off;
776         db_recno_t nrecs;
777         size_t (*func) __P((DB *, const DBT *, const DBT *));
778         int (*pitem) __P((DBC *, PAGE *, u_int32_t, u_int32_t, DBT *, DBT *));
779         u_int32_t n, nbytes, nksize, oldsize, size;
780         int ret;
781
782         dbp = dbc->dbp;
783         cp = (BTREE_CURSOR *)dbc->internal;
784         t = dbp->bt_internal;
785         ppage = parent->page;
786         child = parent + 1;
787
788         /* If handling record numbers, count records split to the right page. */
789         nrecs = F_ISSET(cp, C_RECNUM) &&
790             !LF_ISSET(BPI_SPACEONLY) ? __bam_total(dbp, rchild) : 0;
791
792         /*
793          * Now we insert the new page's first key into the parent page, which
794          * completes the split.  The parent points to a PAGE and a page index
795          * offset, where the new key goes ONE AFTER the index, because we split
796          * to the right.
797          *
798          * XXX
799          * Some btree algorithms replace the key for the old page as well as
800          * the new page.  We don't, as there's no reason to believe that the
801          * first key on the old page is any better than the key we have, and,
802          * in the case of a key being placed at index 0 causing the split, the
803          * key is unavailable.
804          */
805         off = parent->indx + O_INDX;
806         if (LF_ISSET(BPI_REPLACE))
807                 oldsize = TYPE(ppage) == P_IRECNO ? RINTERNAL_PSIZE :
808                     BINTERNAL_PSIZE(GET_BINTERNAL(dbp, ppage, off)->len);
809         else
810                 oldsize = 0;
811
812         /*
813          * Calculate the space needed on the parent page.
814          *
815          * Prefix trees: space hack used when inserting into BINTERNAL pages.
816          * Retain only what's needed to distinguish between the new entry and
817          * the LAST entry on the page to its left.  If the keys compare equal,
818          * retain the entire key.  We ignore overflow keys, and the entire key
819          * must be retained for the next-to-leftmost key on the leftmost page
820          * of each level, or the search will fail.  Applicable ONLY to internal
821          * pages that have leaf pages as children.  Further reduction of the
822          * key between pairs of internal pages loses too much information.
823          */
824         switch (TYPE(child->page)) {
825         case P_IBTREE:
826                 child_bi = GET_BINTERNAL(dbp, child->page, split);
827                 nbytes = BINTERNAL_PSIZE(child_bi->len);
828
829                 if (P_FREESPACE(dbp, ppage) + oldsize < nbytes)
830                         return (DB_NEEDSPLIT);
831                 if (LF_ISSET(BPI_SPACEONLY))
832                         return (0);
833
834                 switch (B_TYPE(child_bi->type)) {
835                 case B_KEYDATA:
836                         /* Add a new record for the right page. */
837                         memset(&bi, 0, sizeof(bi));
838                         bi.len = child_bi->len;
839                         B_TSET(bi.type, B_KEYDATA);
840                         bi.pgno = rchild->pgno;
841                         bi.nrecs = nrecs;
842                         DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
843                         DB_SET_DBT(data, child_bi->data, child_bi->len);
844                         size = BINTERNAL_SIZE(child_bi->len);
845                         break;
846                 case B_OVERFLOW:
847                         /* Reuse the overflow key. */
848                         child_bo = (BOVERFLOW *)child_bi->data;
849                         memset(&bo, 0, sizeof(bo));
850                         bo.type = B_OVERFLOW;
851                         bo.tlen = child_bo->tlen;
852                         bo.pgno = child_bo->pgno;
853                         bi.len = BOVERFLOW_SIZE;
854                         B_TSET(bi.type, B_OVERFLOW);
855                         bi.pgno = rchild->pgno;
856                         bi.nrecs = nrecs;
857                         DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
858                         DB_SET_DBT(data, &bo, BOVERFLOW_SIZE);
859                         size = BINTERNAL_SIZE(BOVERFLOW_SIZE);
860                         break;
861                 case B_DUPLICATE:
862                 default:
863                         goto pgfmt;
864                 }
865                 break;
866         case P_LDUP:
867         case P_LBTREE:
868                 child_bk = GET_BKEYDATA(dbp, child->page, split);
869                 switch (B_TYPE(child_bk->type)) {
870                 case B_KEYDATA:
871                         nbytes = BINTERNAL_PSIZE(child_bk->len);
872                         nksize = child_bk->len;
873
874                         /*
875                          * Prefix compression:
876                          * We set t->bt_prefix to NULL if we have a comparison
877                          * callback but no prefix compression callback.  But,
878                          * if we're splitting in an off-page duplicates tree,
879                          * we still have to do some checking.  If using the
880                          * default off-page duplicates comparison routine we
881                          * can use the default prefix compression callback. If
882                          * not using the default off-page duplicates comparison
883                          * routine, we can't do any kind of prefix compression
884                          * as there's no way for an application to specify a
885                          * prefix compression callback that corresponds to its
886                          * comparison callback.
887                          *
888                          * No prefix compression if we don't have a compression
889                          * function, or the key we'd compress isn't a normal
890                          * key (for example, it references an overflow page).
891                          *
892                          * Generate a parent page key for the right child page
893                          * from a comparison of the last key on the left child
894                          * page and the first key on the right child page.
895                          */
896                         if (F_ISSET(dbc, DBC_OPD)) {
897                                 if (dbp->dup_compare == __bam_defcmp)
898                                         func = __bam_defpfx;
899                                 else
900                                         func = NULL;
901                         } else
902                                 func = t->bt_prefix;
903                         if (func == NULL)
904                                 goto noprefix;
905                         tmp_bk = GET_BKEYDATA(dbp, lchild, NUM_ENT(lchild) -
906                             (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX));
907                         if (B_TYPE(tmp_bk->type) != B_KEYDATA)
908                                 goto noprefix;
909                         DB_INIT_DBT(a, tmp_bk->data, tmp_bk->len);
910                         DB_INIT_DBT(b, child_bk->data, child_bk->len);
911                         nksize = (u_int32_t)func(dbp, &a, &b);
912                         if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
913                                 nbytes = n;
914                         else
915                                 nksize = child_bk->len;
916
917 noprefix:               if (P_FREESPACE(dbp, ppage) + oldsize < nbytes)
918                                 return (DB_NEEDSPLIT);
919                         if (LF_ISSET(BPI_SPACEONLY))
920                                 return (0);
921
922                         memset(&bi, 0, sizeof(bi));
923                         bi.len = nksize;
924                         B_TSET(bi.type, B_KEYDATA);
925                         bi.pgno = rchild->pgno;
926                         bi.nrecs = nrecs;
927                         DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
928                         DB_SET_DBT(data, child_bk->data, nksize);
929                         size = BINTERNAL_SIZE(nksize);
930                         break;
931                 case B_OVERFLOW:
932                         nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
933
934                         if (P_FREESPACE(dbp, ppage) + oldsize < nbytes)
935                                 return (DB_NEEDSPLIT);
936                         if (LF_ISSET(BPI_SPACEONLY))
937                                 return (0);
938
939                         /* Copy the overflow key. */
940                         child_bo = (BOVERFLOW *)child_bk;
941                         memset(&bo, 0, sizeof(bo));
942                         bo.type = B_OVERFLOW;
943                         bo.tlen = child_bo->tlen;
944                         memset(&hdr, 0, sizeof(hdr));
945                         if ((ret = __db_goff(dbc, &hdr, child_bo->tlen,
946                              child_bo->pgno, &hdr.data, &hdr.size)) == 0)
947                                 ret = __db_poff(dbc, &hdr, &bo.pgno);
948
949                         if (hdr.data != NULL)
950                                 __os_free(dbp->env, hdr.data);
951                         if (ret != 0)
952                                 return (ret);
953
954                         memset(&bi, 0, sizeof(bi));
955                         bi.len = BOVERFLOW_SIZE;
956                         B_TSET(bi.type, B_OVERFLOW);
957                         bi.pgno = rchild->pgno;
958                         bi.nrecs = nrecs;
959                         DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
960                         DB_SET_DBT(data, &bo, BOVERFLOW_SIZE);
961                         size = BINTERNAL_SIZE(BOVERFLOW_SIZE);
962
963                         break;
964                 case B_DUPLICATE:
965                 default:
966                         goto pgfmt;
967                 }
968                 break;
969         case P_IRECNO:
970         case P_LRECNO:
971                 nbytes = RINTERNAL_PSIZE;
972
973                 if (P_FREESPACE(dbp, ppage) + oldsize < nbytes)
974                         return (DB_NEEDSPLIT);
975                 if (LF_ISSET(BPI_SPACEONLY))
976                         return (0);
977
978                 /* Add a new record for the right page. */
979                 DB_SET_DBT(hdr, &ri, RINTERNAL_SIZE);
980                 ri.pgno = rchild->pgno;
981                 ri.nrecs = nrecs;
982                 size = RINTERNAL_SIZE;
983                 data.size = 0;
984                 /*
985                  * For now, we are locking internal recno nodes so
986                  * use two steps.
987                  */
988                 if (LF_ISSET(BPI_REPLACE)) {
989                         if ((ret = __bam_ditem(dbc, ppage, off)) != 0)
990                                 return (ret);
991                         LF_CLR(BPI_REPLACE);
992                 }
993                 break;
994         default:
995 pgfmt:          return (__db_pgfmt(dbp->env, PGNO(child->page)));
996         }
997
998         if (LF_ISSET(BPI_REPLACE)) {
999                 DB_ASSERT(dbp->env, !LF_ISSET(BPI_NOLOGGING));
1000                 if ((ret = __bam_irep(dbc, ppage,
1001                     off, &hdr, data.size != 0 ? &data : NULL)) != 0)
1002                         return (ret);
1003         } else {
1004                 if (LF_ISSET(BPI_NOLOGGING))
1005                         pitem = __db_pitem_nolog;
1006                 else
1007                         pitem = __db_pitem;
1008
1009                 if ((ret = pitem(dbc, ppage,
1010                     off, size, &hdr, data.size != 0 ? &data : NULL)) != 0)
1011                         return (ret);
1012         }
1013
1014         /*
1015          * If a Recno or Btree with record numbers AM page, or an off-page
1016          * duplicates tree, adjust the parent page's left page record count.
1017          */
1018         if (F_ISSET(cp, C_RECNUM) && !LF_ISSET(BPI_NORECNUM)) {
1019                 /* Log the change. */
1020                 if (DBC_LOGGING(dbc)) {
1021                         if ((ret = __bam_cadjust_log(dbp, dbc->txn,
1022                             &LSN(ppage), 0, PGNO(ppage), &LSN(ppage),
1023                             parent->indx, -(int32_t)nrecs, 0)) != 0)
1024                                 return (ret);
1025                 } else
1026                         LSN_NOT_LOGGED(LSN(ppage));
1027
1028                 /* Update the left page count. */
1029                 if (dbc->dbtype == DB_RECNO)
1030                         GET_RINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
1031                 else
1032                         GET_BINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
1033         }
1034
1035         return (0);
1036 }
1037
1038 /*
1039  * __bam_psplit --
1040  *      Do the real work of splitting the page.
1041  */
1042 static int
1043 __bam_psplit(dbc, cp, lp, rp, splitret)
1044         DBC *dbc;
1045         EPG *cp;
1046         PAGE *lp, *rp;
1047         db_indx_t *splitret;
1048 {
1049         DB *dbp;
1050         PAGE *pp;
1051         db_indx_t half, *inp, nbytes, off, splitp, top;
1052         int adjust, cnt, iflag, isbigkey, ret;
1053
1054         dbp = dbc->dbp;
1055         pp = cp->page;
1056         inp = P_INP(dbp, pp);
1057         adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
1058
1059         /*
1060          * If we're splitting the first (last) page on a level because we're
1061          * inserting (appending) a key to it, it's likely that the data is
1062          * sorted.  Moving a single item to the new page is less work and can
1063          * push the fill factor higher than normal.  This is trivial when we
1064          * are splitting a new page before the beginning of the tree, all of
1065          * the interesting tests are against values of 0.
1066          *
1067          * Catching appends to the tree is harder.  In a simple append, we're
1068          * inserting an item that sorts past the end of the tree; the cursor
1069          * will point past the last element on the page.  But, in trees with
1070          * duplicates, the cursor may point to the last entry on the page --
1071          * in this case, the entry will also be the last element of a duplicate
1072          * set (the last because the search call specified the SR_DUPLAST flag).
1073          * The only way to differentiate between an insert immediately before
1074          * the last item in a tree or an append after a duplicate set which is
1075          * also the last item in the tree is to call the comparison function.
1076          * When splitting internal pages during an append, the search code
1077          * guarantees the cursor always points to the largest page item less
1078          * than the new internal entry.  To summarize, we want to catch three
1079          * possible index values:
1080          *
1081          *      NUM_ENT(page)           Btree/Recno leaf insert past end-of-tree
1082          *      NUM_ENT(page) - O_INDX  Btree or Recno internal insert past EOT
1083          *      NUM_ENT(page) - P_INDX  Btree leaf insert past EOT after a set
1084          *                                  of duplicates
1085          *
1086          * two of which, (NUM_ENT(page) - O_INDX or P_INDX) might be an insert
1087          * near the end of the tree, and not after the end of the tree at all.
1088          * Do a simple test which might be wrong because calling the comparison
1089          * functions is expensive.  Regardless, it's not a big deal if we're
1090          * wrong, we'll do the split the right way next time.
1091          */
1092         off = 0;
1093         if (NEXT_PGNO(pp) == PGNO_INVALID && cp->indx >= NUM_ENT(pp) - adjust)
1094                 off = NUM_ENT(pp) - adjust;
1095         else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
1096                 off = adjust;
1097         if (off != 0)
1098                 goto sort;
1099
1100         /*
1101          * Split the data to the left and right pages.  Try not to split on
1102          * an overflow key.  (Overflow keys on internal pages will slow down
1103          * searches.)  Refuse to split in the middle of a set of duplicates.
1104          *
1105          * First, find the optimum place to split.
1106          *
1107          * It's possible to try and split past the last record on the page if
1108          * there's a very large record at the end of the page.  Make sure this
1109          * doesn't happen by bounding the check at the next-to-last entry on
1110          * the page.
1111          *
1112          * Note, we try and split half the data present on the page.  This is
1113          * because another process may have already split the page and left
1114          * it half empty.  We don't try and skip the split -- we don't know
1115          * how much space we're going to need on the page, and we may need up
1116          * to half the page for a big item, so there's no easy test to decide
1117          * if we need to split or not.  Besides, if two threads are inserting
1118          * data into the same place in the database, we're probably going to
1119          * need more space soon anyway.
1120          */
1121         top = NUM_ENT(pp) - adjust;
1122         half = (dbp->pgsize - HOFFSET(pp)) / 2;
1123         for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
1124                 switch (TYPE(pp)) {
1125                 case P_IBTREE:
1126                         if (B_TYPE(
1127                             GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA)
1128                                 nbytes += BINTERNAL_SIZE(
1129                                    GET_BINTERNAL(dbp, pp, off)->len);
1130                         else
1131                                 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
1132                         break;
1133                 case P_LBTREE:
1134                         if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1135                             B_KEYDATA)
1136                                 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
1137                                     pp, off)->len);
1138                         else
1139                                 nbytes += BOVERFLOW_SIZE;
1140
1141                         ++off;
1142                         /* FALLTHROUGH */
1143                 case P_LDUP:
1144                 case P_LRECNO:
1145                         if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1146                             B_KEYDATA)
1147                                 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
1148                                     pp, off)->len);
1149                         else
1150                                 nbytes += BOVERFLOW_SIZE;
1151                         break;
1152                 case P_IRECNO:
1153                         nbytes += RINTERNAL_SIZE;
1154                         break;
1155                 default:
1156                         return (__db_pgfmt(dbp->env, pp->pgno));
1157                 }
1158 sort:   splitp = off;
1159
1160         /*
1161          * Splitp is either at or just past the optimum split point.  If the
1162          * tree type is such that we're going to promote a key to an internal
1163          * page, and our current choice is an overflow key, look for something
1164          * close by that's smaller.
1165          */
1166         switch (TYPE(pp)) {
1167         case P_IBTREE:
1168                 iflag = 1;
1169                 isbigkey =
1170                     B_TYPE(GET_BINTERNAL(dbp, pp, off)->type) != B_KEYDATA;
1171                 break;
1172         case P_LBTREE:
1173         case P_LDUP:
1174                 iflag = 0;
1175                 isbigkey = B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) !=
1176                     B_KEYDATA;
1177                 break;
1178         default:
1179                 iflag = isbigkey = 0;
1180         }
1181         if (isbigkey)
1182                 for (cnt = 1; cnt <= 3; ++cnt) {
1183                         off = splitp + cnt * adjust;
1184                         if (off < (db_indx_t)NUM_ENT(pp) &&
1185                             ((iflag && B_TYPE(
1186                             GET_BINTERNAL(dbp, pp,off)->type) == B_KEYDATA) ||
1187                             B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1188                             B_KEYDATA)) {
1189                                 splitp = off;
1190                                 break;
1191                         }
1192                         if (splitp <= (db_indx_t)(cnt * adjust))
1193                                 continue;
1194                         off = splitp - cnt * adjust;
1195                         if (iflag ? B_TYPE(
1196                             GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA :
1197                             B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1198                             B_KEYDATA) {
1199                                 splitp = off;
1200                                 break;
1201                         }
1202                 }
1203
1204         /*
1205          * We can't split in the middle a set of duplicates.  We know that
1206          * no duplicate set can take up more than about 25% of the page,
1207          * because that's the point where we push it off onto a duplicate
1208          * page set.  So, this loop can't be unbounded.
1209          */
1210         if (TYPE(pp) == P_LBTREE &&
1211             inp[splitp] == inp[splitp - adjust])
1212                 for (cnt = 1;; ++cnt) {
1213                         off = splitp + cnt * adjust;
1214                         if (off < NUM_ENT(pp) &&
1215                             inp[splitp] != inp[off]) {
1216                                 splitp = off;
1217                                 break;
1218                         }
1219                         if (splitp <= (db_indx_t)(cnt * adjust))
1220                                 continue;
1221                         off = splitp - cnt * adjust;
1222                         if (inp[splitp] != inp[off]) {
1223                                 splitp = off + adjust;
1224                                 break;
1225                         }
1226                 }
1227
1228         /* We're going to split at splitp. */
1229         if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
1230                 return (ret);
1231         if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
1232                 return (ret);
1233
1234         *splitret = splitp;
1235         return (0);
1236 }
1237
1238 /*
1239  * __bam_copy --
1240  *      Copy a set of records from one page to another.
1241  *
1242  * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
1243  */
1244 int
1245 __bam_copy(dbp, pp, cp, nxt, stop)
1246         DB *dbp;
1247         PAGE *pp, *cp;
1248         u_int32_t nxt, stop;
1249 {
1250         BINTERNAL internal;
1251         db_indx_t *cinp, nbytes, off, *pinp;
1252
1253         cinp = P_INP(dbp, cp);
1254         pinp = P_INP(dbp, pp);
1255         /*
1256          * Nxt is the offset of the next record to be placed on the target page.
1257          */
1258         for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
1259                 switch (TYPE(pp)) {
1260                 case P_IBTREE:
1261                         if (off == 0 && nxt != 0)
1262                                 nbytes = BINTERNAL_SIZE(0);
1263                         else if (B_TYPE(
1264                             GET_BINTERNAL(dbp, pp, nxt)->type) == B_KEYDATA)
1265                                 nbytes = BINTERNAL_SIZE(
1266                                     GET_BINTERNAL(dbp, pp, nxt)->len);
1267                         else
1268                                 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
1269                         break;
1270                 case P_LBTREE:
1271                         /*
1272                          * If we're on a key and it's a duplicate, just copy
1273                          * the offset.
1274                          */
1275                         if (off != 0 && (nxt % P_INDX) == 0 &&
1276                             pinp[nxt] == pinp[nxt - P_INDX]) {
1277                                 cinp[off] = cinp[off - P_INDX];
1278                                 continue;
1279                         }
1280                         /* FALLTHROUGH */
1281                 case P_LDUP:
1282                 case P_LRECNO:
1283                         if (B_TYPE(GET_BKEYDATA(dbp, pp, nxt)->type) ==
1284                             B_KEYDATA)
1285                                 nbytes = BKEYDATA_SIZE(GET_BKEYDATA(dbp,
1286                                     pp, nxt)->len);
1287                         else
1288                                 nbytes = BOVERFLOW_SIZE;
1289                         break;
1290                 case P_IRECNO:
1291                         nbytes = RINTERNAL_SIZE;
1292                         break;
1293                 default:
1294                         return (__db_pgfmt(dbp->env, pp->pgno));
1295                 }
1296                 cinp[off] = HOFFSET(cp) -= nbytes;
1297                 if (off == 0 && nxt != 0 && TYPE(pp) == P_IBTREE) {
1298                         internal.len = 0;
1299                         UMRW_SET(internal.unused);
1300                         internal.type = B_KEYDATA;
1301                         internal.pgno = GET_BINTERNAL(dbp, pp, nxt)->pgno;
1302                         internal.nrecs = GET_BINTERNAL(dbp, pp, nxt)->nrecs;
1303                         memcpy(P_ENTRY(dbp, cp, off), &internal, nbytes);
1304                 }
1305                 else
1306                         memcpy(P_ENTRY(dbp, cp, off),
1307                              P_ENTRY(dbp, pp, nxt), nbytes);
1308         }
1309         return (0);
1310 }