2 * See the file LICENSE for redistribution information.
4 * Copyright (c) 1996-2009 Oracle. All rights reserved.
7 * Copyright (c) 1990, 1993, 1994, 1995, 1996
8 * Keith Bostic. All rights reserved.
11 * Copyright (c) 1990, 1993, 1994, 1995
12 * The Regents of the University of California. All rights reserved.
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions
17 * 1. Redistributions of source code must retain the above copyright
18 * notice, this list of conditions and the following disclaimer.
19 * 2. Redistributions in binary form must reproduce the above copyright
20 * notice, this list of conditions and the following disclaimer in the
21 * documentation and/or other materials provided with the distribution.
22 * 3. Neither the name of the University nor the names of its contributors
23 * may be used to endorse or promote products derived from this software
24 * without specific prior written permission.
26 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
27 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
29 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
30 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
31 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
32 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
33 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
34 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
35 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
41 #include "db_config.h"
44 #include "dbinc/db_page.h"
45 #include "dbinc/lock.h"
47 #include "dbinc/btree.h"
49 static int __bam_page __P((DBC *, EPG *, EPG *));
50 static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
51 static int __bam_root __P((DBC *, EPG *));
57 * PUBLIC: int __bam_split __P((DBC *, void *, db_pgno_t *));
60 __bam_split(dbc, arg, root_pgnop)
63 db_pgno_t *root_pgnop;
66 DB_LOCK metalock, next_lock;
67 enum { UP, DOWN } dir;
68 db_pgno_t pgno, next_pgno, root_pgno;
69 int exact, level, ret;
71 cp = (BTREE_CURSOR *)dbc->internal;
74 next_pgno = PGNO_INVALID;
77 * First get a lock on the metadata page, we will have to allocate
78 * pages and cannot get a lock while we have the search tree pinnned.
82 if ((ret = __db_lget(dbc,
83 0, pgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
87 * The locking protocol we use to avoid deadlock to acquire locks by
88 * walking down the tree, but we do it as lazily as possible, locking
89 * the root only as a last resort. We expect all stack pages to have
90 * been discarded before we're called; we discard all short-term locks.
92 * When __bam_split is first called, we know that a leaf page was too
93 * full for an insert. We don't know what leaf page it was, but we
94 * have the key/recno that caused the problem. We call XX_search to
95 * reacquire the leaf page, but this time get both the leaf page and
96 * its parent, locked. We then split the leaf page and see if the new
97 * internal key will fit into the parent page. If it will, we're done.
99 * If it won't, we discard our current locks and repeat the process,
100 * only this time acquiring the parent page and its parent, locked.
101 * This process repeats until we succeed in the split, splitting the
102 * root page as the final resort. The entire process then repeats,
103 * as necessary, until we split a leaf page.
106 * A traditional method of speeding this up is to maintain a stack of
107 * the pages traversed in the original search. You can detect if the
108 * stack is correct by storing the page's LSN when it was searched and
109 * comparing that LSN with the current one when it's locked during the
110 * split. This would be an easy change for this code, but I have no
111 * numbers that indicate it's worthwhile.
113 for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
115 * Acquire a page and its parent, locked.
117 retry: if ((ret = (dbc->dbtype == DB_BTREE ?
118 __bam_search(dbc, PGNO_INVALID,
119 arg, SR_WRPAIR, level, NULL, &exact) :
121 (db_recno_t *)arg, SR_WRPAIR, level, &exact))) != 0)
124 if (cp->csp[0].page->pgno == root_pgno) {
125 /* we can overshoot the top of the tree. */
126 level = cp->csp[0].page->level;
127 if (root_pgnop != NULL)
128 *root_pgnop = root_pgno;
129 } else if (root_pgnop != NULL)
130 *root_pgnop = cp->csp[-1].page->pgno;
133 * Split the page if it still needs it (it's possible another
134 * thread of control has already split the page). If we are
135 * guaranteed that two items will fit on the page, the split
136 * is no longer necessary.
138 if (2 * B_MAXSIZEONPAGE(cp->ovflsize)
139 <= (db_indx_t)P_FREESPACE(dbc->dbp, cp->csp[0].page)) {
140 if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0)
146 * We need to try to lock the next page so we can update
149 if (dbc->dbtype == DB_BTREE && ISLEAF(cp->csp->page) &&
150 (pgno = NEXT_PGNO(cp->csp->page)) != PGNO_INVALID) {
152 next_pgno, next_lock, DB_LOCK_WRITE, retry);
156 ret = cp->csp[0].page->pgno == root_pgno ?
157 __bam_root(dbc, &cp->csp[0]) :
158 __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
163 no_split: /* Once we've split the leaf page, we're done. */
164 if (level == LEAFLEVEL)
167 /* Switch directions. */
173 * It's possible to fail to split repeatedly, as other
174 * threads may be modifying the tree, or the page usage
175 * is sufficiently bad that we don't get enough space
186 err: if (root_pgnop != NULL)
187 *root_pgnop = cp->root;
188 done: (void)__LPUT(dbc, metalock);
189 (void)__TLPUT(dbc, next_lock);
195 * Split the root page of a btree.
203 DBT log_dbt, rootent[2];
204 DB_LOCK llock, rlock;
217 COMPQUIET(log_dbt.data, NULL);
220 if (cp->page->level >= MAXBTREELEVEL) {
222 "Too many btree levels: %d", cp->page->level);
226 if ((ret = __memp_dirty(mpf,
227 &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
230 /* Create new left and right pages for the split. */
231 if ((ret = __db_new(dbc, TYPE(cp->page), &llock, &lp)) != 0 ||
232 (ret = __db_new(dbc, TYPE(cp->page), &rlock, &rp)) != 0)
234 P_INIT(lp, dbp->pgsize, lp->pgno,
235 PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
236 cp->page->level, TYPE(cp->page));
237 P_INIT(rp, dbp->pgsize, rp->pgno,
238 ISINTERNAL(cp->page) ? PGNO_INVALID : lp->pgno, PGNO_INVALID,
239 cp->page->level, TYPE(cp->page));
241 /* Split the page. */
242 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
245 if (DBC_LOGGING(dbc)) {
246 memset(&log_dbt, 0, sizeof(log_dbt));
248 __os_malloc(dbp->env, dbp->pgsize, &log_dbt.data)) != 0)
250 log_dbt.size = dbp->pgsize;
251 memcpy(log_dbt.data, cp->page, dbp->pgsize);
254 /* Clean up the new root page. */
255 if ((ret = (dbc->dbtype == DB_RECNO ?
256 __ram_root(dbc, cp->page, lp, rp) :
257 __bam_broot(dbc, cp->page, split, lp, rp))) != 0) {
258 if (DBC_LOGGING(dbc))
259 __os_free(dbp->env, log_dbt.data);
263 /* Log the change. */
264 if (DBC_LOGGING(dbc)) {
265 memset(rootent, 0, sizeof(rootent));
266 rootent[0].data = GET_BINTERNAL(dbp, cp->page, 0);
267 rootent[1].data = GET_BINTERNAL(dbp, cp->page, 1);
268 if (dbc->dbtype == DB_RECNO)
269 rootent[0].size = rootent[1].size = RINTERNAL_SIZE;
271 rootent[0].size = BINTERNAL_SIZE(
272 ((BINTERNAL *)rootent[0].data)->len);
273 rootent[1].size = BINTERNAL_SIZE(
274 ((BINTERNAL *)rootent[1].data)->len);
278 (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0;
279 if (dbc->dbtype == DB_RECNO)
280 opflags |= SPL_RECNO;
281 ret = __bam_split_log(dbp,
282 dbc->txn, &LSN(cp->page), 0, PGNO(lp), &LSN(lp), PGNO(rp),
283 &LSN(rp), (u_int32_t)NUM_ENT(lp), PGNO_INVALID, &log_lsn,
284 dbc->internal->root, &LSN(cp->page), 0,
285 &log_dbt, &rootent[0], &rootent[1], opflags);
287 __os_free(dbp->env, log_dbt.data);
292 LSN_NOT_LOGGED(LSN(cp->page));
293 LSN(lp) = LSN(cp->page);
294 LSN(rp) = LSN(cp->page);
296 /* Adjust any cursors. */
297 ret = __bam_ca_split(dbc, cp->page->pgno, lp->pgno, rp->pgno, split, 1);
299 /* Success or error: release pages and locks. */
300 err: if (cp->page != NULL && (t_ret = __memp_fput(mpf,
301 dbc->thread_info, cp->page, dbc->priority)) != 0 && ret == 0)
306 * We are done. Put or downgrade all our locks and release
309 if ((t_ret = __TLPUT(dbc, llock)) != 0 && ret == 0)
311 if ((t_ret = __TLPUT(dbc, rlock)) != 0 && ret == 0)
313 if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
315 if (lp != NULL && (t_ret = __memp_fput(mpf,
316 dbc->thread_info, lp, dbc->priority)) != 0 && ret == 0)
318 if (rp != NULL && (t_ret = __memp_fput(mpf,
319 dbc->thread_info, rp, dbc->priority)) != 0 && ret == 0)
327 * Split the non-root page of a btree.
330 __bam_page(dbc, pp, cp)
341 PAGE *lp, *rp, *alloc_rp, *tp;
348 alloc_rp = lp = rp = tp = NULL;
353 * Create new left page for the split, and fill in everything
354 * except its LSN and next-page page number.
356 * Create a new right page for the split, and fill in everything
357 * except its LSN and page number.
359 * We malloc space for both the left and right pages, so we don't get
360 * a new page from the underlying buffer pool until we know the split
361 * is going to succeed. The reason is that we can't release locks
362 * acquired during the get-a-new-page process because metadata page
363 * locks can't be discarded on failure since we may have modified the
364 * free list. So, if you assume that we're holding a write lock on the
365 * leaf page which ran out of space and started this split (e.g., we
366 * have already written records to the page, or we retrieved a record
367 * from it with the DB_RMW flag set), failing in a split with both a
368 * leaf page locked and the metadata page locked can potentially lock
369 * up the tree badly, because we've violated the rule of always locking
370 * down the tree, and never up.
372 if ((ret = __os_malloc(dbp->env, dbp->pgsize * 2, &lp)) != 0)
374 P_INIT(lp, dbp->pgsize, PGNO(cp->page),
375 ISINTERNAL(cp->page) ? PGNO_INVALID : PREV_PGNO(cp->page),
376 ISINTERNAL(cp->page) ? PGNO_INVALID : 0,
377 cp->page->level, TYPE(cp->page));
379 rp = (PAGE *)((u_int8_t *)lp + dbp->pgsize);
380 P_INIT(rp, dbp->pgsize, 0,
381 ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page),
382 ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page),
383 cp->page->level, TYPE(cp->page));
388 * Only the indices are sorted on the page, i.e., the key/data pairs
389 * aren't, so it's simpler to copy the data from the split page onto
390 * two new pages instead of copying half the data to a new right page
391 * and compacting the left page in place. Since the left page can't
392 * change, we swap the original and the allocated left page after the
395 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
399 * Test to see if we are going to be able to insert the new pages into
400 * the parent page. The interesting failure here is that the parent
401 * page can't hold the new keys, and has to be split in turn, in which
402 * case we want to release all the locks we can.
404 if ((ret = __bam_pinsert(dbc, pp, split, lp, rp, BPI_SPACEONLY)) != 0)
408 * We've got everything locked down we need, and we know the split
409 * is going to succeed. Go and get the additional page we'll need.
411 if ((ret = __db_new(dbc, TYPE(cp->page), &rplock, &alloc_rp)) != 0)
415 * Prepare to fix up the previous pointer of any leaf page following
416 * the split page. Our caller has already write locked the page so
417 * we can get it without deadlocking on the parent latch.
419 if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID &&
420 (ret = __memp_fget(mpf, &NEXT_PGNO(cp->page),
421 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &tp)) != 0)
425 * Fix up the page numbers we didn't have before. We have to do this
426 * before calling __bam_pinsert because it may copy a page number onto
427 * the parent page and it takes the page number from its page argument.
429 PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp);
431 DB_ASSERT(dbp->env, IS_DIRTY(cp->page));
432 DB_ASSERT(dbp->env, IS_DIRTY(pp->page));
434 /* Actually update the parent page. */
435 if ((ret = __bam_pinsert(dbc, pp, split, lp, rp, BPI_NOLOGGING)) != 0)
438 bc = (BTREE_CURSOR *)dbc->internal;
439 /* Log the change. */
440 if (DBC_LOGGING(dbc)) {
441 memset(&log_dbt, 0, sizeof(log_dbt));
442 log_dbt.data = cp->page;
443 log_dbt.size = dbp->pgsize;
444 memset(&rentry, 0, sizeof(rentry));
445 rentry.data = GET_BINTERNAL(dbp, pp->page, pp->indx + 1);
446 opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0;
447 if (dbc->dbtype == DB_RECNO) {
448 opflags |= SPL_RECNO;
449 rentry.size = RINTERNAL_SIZE;
452 BINTERNAL_SIZE(((BINTERNAL *)rentry.data)->len);
455 if ((ret = __bam_split_log(dbp, dbc->txn, &LSN(cp->page), 0,
456 PGNO(cp->page), &LSN(cp->page), PGNO(alloc_rp),
457 &LSN(alloc_rp), (u_int32_t)NUM_ENT(lp),
458 tp == NULL ? 0 : PGNO(tp), tp == NULL ? &log_lsn : &LSN(tp),
459 PGNO(pp->page), &LSN(pp->page), pp->indx,
460 &log_dbt, NULL, &rentry, opflags)) != 0) {
462 * Undo the update to the parent page, which has not
463 * been logged yet. This must succeed.
465 t_ret = __db_ditem_nolog(dbc, pp->page,
466 pp->indx + 1, rentry.size);
467 DB_ASSERT(dbp->env, t_ret == 0);
473 LSN_NOT_LOGGED(LSN(cp->page));
475 /* Update the LSNs for all involved pages. */
476 LSN(alloc_rp) = LSN(cp->page);
477 LSN(lp) = LSN(cp->page);
478 LSN(rp) = LSN(cp->page);
479 LSN(pp->page) = LSN(cp->page);
481 /* Log record has been written; now it is safe to update next page. */
482 PREV_PGNO(tp) = PGNO(rp);
483 LSN(tp) = LSN(cp->page);
487 * Copy the left and right pages into place. There are two paths
488 * through here. Either we are logging and we set the LSNs in the
489 * logging path. However, if we are not logging, then we do not
490 * have valid LSNs on lp or rp. The correct LSNs to use are the
491 * ones on the page we got from __db_new or the one that was
492 * originally on cp->page. In both cases, we save the LSN from the
493 * real database page (not a malloc'd one) and reapply it after we
496 save_lsn = alloc_rp->lsn;
497 memcpy(alloc_rp, rp, LOFFSET(dbp, rp));
498 memcpy((u_int8_t *)alloc_rp + HOFFSET(rp),
499 (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp));
500 alloc_rp->lsn = save_lsn;
502 save_lsn = cp->page->lsn;
503 memcpy(cp->page, lp, LOFFSET(dbp, lp));
504 memcpy((u_int8_t *)cp->page + HOFFSET(lp),
505 (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
506 cp->page->lsn = save_lsn;
508 /* Adjust any cursors. */
509 if ((ret = __bam_ca_split(dbc,
510 PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0)) != 0)
513 __os_free(dbp->env, lp);
516 * Success -- write the real pages back to the store.
518 if ((t_ret = __memp_fput(mpf,
519 dbc->thread_info, alloc_rp, dbc->priority)) != 0 && ret == 0)
521 if ((t_ret = __TLPUT(dbc, rplock)) != 0 && ret == 0)
524 if ((t_ret = __memp_fput(mpf,
525 dbc->thread_info, tp, dbc->priority)) != 0 && ret == 0)
528 if ((t_ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0 && ret == 0)
533 __os_free(dbp->env, lp);
534 if (alloc_rp != NULL)
535 (void)__memp_fput(mpf,
536 dbc->thread_info, alloc_rp, dbc->priority);
538 (void)__memp_fput(mpf, dbc->thread_info, tp, dbc->priority);
540 if (pp->page != NULL)
541 (void)__memp_fput(mpf,
542 dbc->thread_info, pp->page, dbc->priority);
544 if (ret == DB_NEEDSPLIT)
545 (void)__LPUT(dbc, pp->lock);
547 (void)__TLPUT(dbc, pp->lock);
549 (void)__memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority);
552 * We don't drop the left and right page locks. If we doing dirty
553 * reads then we need to hold the locks until we abort the transaction.
554 * If we are not transactional, we are hosed anyway as the tree
555 * is trashed. It may be better not to leak the locks.
558 if (dbc->txn == NULL)
559 (void)__LPUT(dbc, rplock);
561 if (dbc->txn == NULL || ret == DB_NEEDSPLIT)
562 (void)__LPUT(dbc, cp->lock);
569 * Fix up the btree root page after it has been split.
570 * PUBLIC: int __bam_broot __P((DBC *, PAGE *, u_int32_t, PAGE *, PAGE *));
573 __bam_broot(dbc, rootp, split, lp, rp)
576 PAGE *rootp, *lp, *rp;
578 BINTERNAL bi, bi0, *child_bi;
580 BOVERFLOW bo, *child_bo;
588 cp = (BTREE_CURSOR *)dbc->internal;
591 memset(&bi, 0, sizeof(bi));
593 switch (TYPE(rootp)) {
595 /* Copy the first key of the child page onto the root page. */
596 child_bi = GET_BINTERNAL(dbp, rootp, split);
597 switch (B_TYPE(child_bi->type)) {
599 bi.len = child_bi->len;
600 B_TSET(bi.type, B_KEYDATA);
602 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
603 if ((ret = __os_malloc(dbp->env,
604 child_bi->len, &data.data)) != 0)
606 memcpy(data.data, child_bi->data, child_bi->len);
607 data.size = child_bi->len;
610 /* Reuse the overflow key. */
611 child_bo = (BOVERFLOW *)child_bi->data;
612 memset(&bo, 0, sizeof(bo));
613 bo.type = B_OVERFLOW;
614 bo.tlen = child_bo->tlen;
615 bo.pgno = child_bo->pgno;
616 bi.len = BOVERFLOW_SIZE;
617 B_TSET(bi.type, B_OVERFLOW);
619 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
620 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE);
629 /* Copy the first key of the child page onto the root page. */
630 child_bk = GET_BKEYDATA(dbp, rootp, split);
631 switch (B_TYPE(child_bk->type)) {
633 bi.len = child_bk->len;
634 B_TSET(bi.type, B_KEYDATA);
636 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
637 if ((ret = __os_malloc(dbp->env,
638 child_bk->len, &data.data)) != 0)
640 memcpy(data.data, child_bk->data, child_bk->len);
641 data.size = child_bk->len;
644 /* Copy the overflow key. */
645 child_bo = (BOVERFLOW *)child_bk;
646 memset(&bo, 0, sizeof(bo));
647 bo.type = B_OVERFLOW;
648 bo.tlen = child_bo->tlen;
649 memset(&hdr, 0, sizeof(hdr));
650 if ((ret = __db_goff(dbc, &hdr, child_bo->tlen,
651 child_bo->pgno, &hdr.data, &hdr.size)) == 0)
652 ret = __db_poff(dbc, &hdr, &bo.pgno);
654 if (hdr.data != NULL)
655 __os_free(dbp->env, hdr.data);
659 bi.len = BOVERFLOW_SIZE;
660 B_TSET(bi.type, B_OVERFLOW);
662 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
663 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE);
671 pgfmt: return (__db_pgfmt(dbp->env, rp->pgno));
674 * If the root page was a leaf page, change it into an internal page.
675 * We copy the key we split on (but not the key's data, in the case of
676 * a leaf page) to the new root page.
678 root_pgno = cp->root;
679 P_INIT(rootp, dbp->pgsize,
680 root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
683 * The btree comparison code guarantees that the left-most key on any
684 * internal btree page is never used, so it doesn't need to be filled
685 * in. Set the record count if necessary.
687 memset(&bi0, 0, sizeof(bi0));
688 B_TSET(bi0.type, B_KEYDATA);
690 if (F_ISSET(cp, C_RECNUM)) {
691 bi0.nrecs = __bam_total(dbp, lp);
692 RE_NREC_SET(rootp, bi0.nrecs);
693 bi.nrecs = __bam_total(dbp, rp);
694 RE_NREC_ADJ(rootp, bi.nrecs);
696 DB_SET_DBT(hdr0, &bi0, SSZA(BINTERNAL, data));
697 if ((ret = __db_pitem_nolog(dbc, rootp,
698 0, BINTERNAL_SIZE(0), &hdr0, NULL)) != 0)
700 ret = __db_pitem_nolog(dbc, rootp, 1,
701 BINTERNAL_SIZE(data.size), &hdr, &data);
703 err: if (data.data != NULL && child_bo == NULL)
704 __os_free(dbp->env, data.data);
710 * Fix up the recno root page after it has been split.
711 * PUBLIC: int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
714 __ram_root(dbc, rootp, lp, rp)
716 PAGE *rootp, *lp, *rp;
725 root_pgno = dbc->internal->root;
727 /* Initialize the page. */
728 P_INIT(rootp, dbp->pgsize,
729 root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
731 /* Initialize the header. */
732 DB_SET_DBT(hdr, &ri, RINTERNAL_SIZE);
734 /* Insert the left and right keys, set the header information. */
736 ri.nrecs = __bam_total(dbp, lp);
737 if ((ret = __db_pitem_nolog(dbc,
738 rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
740 RE_NREC_SET(rootp, ri.nrecs);
742 ri.nrecs = __bam_total(dbp, rp);
743 if ((ret = __db_pitem_nolog(dbc,
744 rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
746 RE_NREC_ADJ(rootp, ri.nrecs);
752 * Insert a new key into a parent page, completing the split.
754 * PUBLIC: int __bam_pinsert
755 * PUBLIC: __P((DBC *, EPG *, u_int32_t, PAGE *, PAGE *, int));
758 __bam_pinsert(dbc, parent, split, lchild, rchild, flags)
762 PAGE *lchild, *rchild;
765 BINTERNAL bi, *child_bi;
766 BKEYDATA *child_bk, *tmp_bk;
767 BOVERFLOW bo, *child_bo;
777 size_t (*func) __P((DB *, const DBT *, const DBT *));
778 int (*pitem) __P((DBC *, PAGE *, u_int32_t, u_int32_t, DBT *, DBT *));
779 u_int32_t n, nbytes, nksize, oldsize, size;
783 cp = (BTREE_CURSOR *)dbc->internal;
784 t = dbp->bt_internal;
785 ppage = parent->page;
788 /* If handling record numbers, count records split to the right page. */
789 nrecs = F_ISSET(cp, C_RECNUM) &&
790 !LF_ISSET(BPI_SPACEONLY) ? __bam_total(dbp, rchild) : 0;
793 * Now we insert the new page's first key into the parent page, which
794 * completes the split. The parent points to a PAGE and a page index
795 * offset, where the new key goes ONE AFTER the index, because we split
799 * Some btree algorithms replace the key for the old page as well as
800 * the new page. We don't, as there's no reason to believe that the
801 * first key on the old page is any better than the key we have, and,
802 * in the case of a key being placed at index 0 causing the split, the
803 * key is unavailable.
805 off = parent->indx + O_INDX;
806 if (LF_ISSET(BPI_REPLACE))
807 oldsize = TYPE(ppage) == P_IRECNO ? RINTERNAL_PSIZE :
808 BINTERNAL_PSIZE(GET_BINTERNAL(dbp, ppage, off)->len);
813 * Calculate the space needed on the parent page.
815 * Prefix trees: space hack used when inserting into BINTERNAL pages.
816 * Retain only what's needed to distinguish between the new entry and
817 * the LAST entry on the page to its left. If the keys compare equal,
818 * retain the entire key. We ignore overflow keys, and the entire key
819 * must be retained for the next-to-leftmost key on the leftmost page
820 * of each level, or the search will fail. Applicable ONLY to internal
821 * pages that have leaf pages as children. Further reduction of the
822 * key between pairs of internal pages loses too much information.
824 switch (TYPE(child->page)) {
826 child_bi = GET_BINTERNAL(dbp, child->page, split);
827 nbytes = BINTERNAL_PSIZE(child_bi->len);
829 if (P_FREESPACE(dbp, ppage) + oldsize < nbytes)
830 return (DB_NEEDSPLIT);
831 if (LF_ISSET(BPI_SPACEONLY))
834 switch (B_TYPE(child_bi->type)) {
836 /* Add a new record for the right page. */
837 memset(&bi, 0, sizeof(bi));
838 bi.len = child_bi->len;
839 B_TSET(bi.type, B_KEYDATA);
840 bi.pgno = rchild->pgno;
842 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
843 DB_SET_DBT(data, child_bi->data, child_bi->len);
844 size = BINTERNAL_SIZE(child_bi->len);
847 /* Reuse the overflow key. */
848 child_bo = (BOVERFLOW *)child_bi->data;
849 memset(&bo, 0, sizeof(bo));
850 bo.type = B_OVERFLOW;
851 bo.tlen = child_bo->tlen;
852 bo.pgno = child_bo->pgno;
853 bi.len = BOVERFLOW_SIZE;
854 B_TSET(bi.type, B_OVERFLOW);
855 bi.pgno = rchild->pgno;
857 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
858 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE);
859 size = BINTERNAL_SIZE(BOVERFLOW_SIZE);
868 child_bk = GET_BKEYDATA(dbp, child->page, split);
869 switch (B_TYPE(child_bk->type)) {
871 nbytes = BINTERNAL_PSIZE(child_bk->len);
872 nksize = child_bk->len;
875 * Prefix compression:
876 * We set t->bt_prefix to NULL if we have a comparison
877 * callback but no prefix compression callback. But,
878 * if we're splitting in an off-page duplicates tree,
879 * we still have to do some checking. If using the
880 * default off-page duplicates comparison routine we
881 * can use the default prefix compression callback. If
882 * not using the default off-page duplicates comparison
883 * routine, we can't do any kind of prefix compression
884 * as there's no way for an application to specify a
885 * prefix compression callback that corresponds to its
886 * comparison callback.
888 * No prefix compression if we don't have a compression
889 * function, or the key we'd compress isn't a normal
890 * key (for example, it references an overflow page).
892 * Generate a parent page key for the right child page
893 * from a comparison of the last key on the left child
894 * page and the first key on the right child page.
896 if (F_ISSET(dbc, DBC_OPD)) {
897 if (dbp->dup_compare == __bam_defcmp)
905 tmp_bk = GET_BKEYDATA(dbp, lchild, NUM_ENT(lchild) -
906 (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX));
907 if (B_TYPE(tmp_bk->type) != B_KEYDATA)
909 DB_INIT_DBT(a, tmp_bk->data, tmp_bk->len);
910 DB_INIT_DBT(b, child_bk->data, child_bk->len);
911 nksize = (u_int32_t)func(dbp, &a, &b);
912 if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
915 nksize = child_bk->len;
917 noprefix: if (P_FREESPACE(dbp, ppage) + oldsize < nbytes)
918 return (DB_NEEDSPLIT);
919 if (LF_ISSET(BPI_SPACEONLY))
922 memset(&bi, 0, sizeof(bi));
924 B_TSET(bi.type, B_KEYDATA);
925 bi.pgno = rchild->pgno;
927 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
928 DB_SET_DBT(data, child_bk->data, nksize);
929 size = BINTERNAL_SIZE(nksize);
932 nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
934 if (P_FREESPACE(dbp, ppage) + oldsize < nbytes)
935 return (DB_NEEDSPLIT);
936 if (LF_ISSET(BPI_SPACEONLY))
939 /* Copy the overflow key. */
940 child_bo = (BOVERFLOW *)child_bk;
941 memset(&bo, 0, sizeof(bo));
942 bo.type = B_OVERFLOW;
943 bo.tlen = child_bo->tlen;
944 memset(&hdr, 0, sizeof(hdr));
945 if ((ret = __db_goff(dbc, &hdr, child_bo->tlen,
946 child_bo->pgno, &hdr.data, &hdr.size)) == 0)
947 ret = __db_poff(dbc, &hdr, &bo.pgno);
949 if (hdr.data != NULL)
950 __os_free(dbp->env, hdr.data);
954 memset(&bi, 0, sizeof(bi));
955 bi.len = BOVERFLOW_SIZE;
956 B_TSET(bi.type, B_OVERFLOW);
957 bi.pgno = rchild->pgno;
959 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data));
960 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE);
961 size = BINTERNAL_SIZE(BOVERFLOW_SIZE);
971 nbytes = RINTERNAL_PSIZE;
973 if (P_FREESPACE(dbp, ppage) + oldsize < nbytes)
974 return (DB_NEEDSPLIT);
975 if (LF_ISSET(BPI_SPACEONLY))
978 /* Add a new record for the right page. */
979 DB_SET_DBT(hdr, &ri, RINTERNAL_SIZE);
980 ri.pgno = rchild->pgno;
982 size = RINTERNAL_SIZE;
985 * For now, we are locking internal recno nodes so
988 if (LF_ISSET(BPI_REPLACE)) {
989 if ((ret = __bam_ditem(dbc, ppage, off)) != 0)
995 pgfmt: return (__db_pgfmt(dbp->env, PGNO(child->page)));
998 if (LF_ISSET(BPI_REPLACE)) {
999 DB_ASSERT(dbp->env, !LF_ISSET(BPI_NOLOGGING));
1000 if ((ret = __bam_irep(dbc, ppage,
1001 off, &hdr, data.size != 0 ? &data : NULL)) != 0)
1004 if (LF_ISSET(BPI_NOLOGGING))
1005 pitem = __db_pitem_nolog;
1009 if ((ret = pitem(dbc, ppage,
1010 off, size, &hdr, data.size != 0 ? &data : NULL)) != 0)
1015 * If a Recno or Btree with record numbers AM page, or an off-page
1016 * duplicates tree, adjust the parent page's left page record count.
1018 if (F_ISSET(cp, C_RECNUM) && !LF_ISSET(BPI_NORECNUM)) {
1019 /* Log the change. */
1020 if (DBC_LOGGING(dbc)) {
1021 if ((ret = __bam_cadjust_log(dbp, dbc->txn,
1022 &LSN(ppage), 0, PGNO(ppage), &LSN(ppage),
1023 parent->indx, -(int32_t)nrecs, 0)) != 0)
1026 LSN_NOT_LOGGED(LSN(ppage));
1028 /* Update the left page count. */
1029 if (dbc->dbtype == DB_RECNO)
1030 GET_RINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
1032 GET_BINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
1040 * Do the real work of splitting the page.
1043 __bam_psplit(dbc, cp, lp, rp, splitret)
1047 db_indx_t *splitret;
1051 db_indx_t half, *inp, nbytes, off, splitp, top;
1052 int adjust, cnt, iflag, isbigkey, ret;
1056 inp = P_INP(dbp, pp);
1057 adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
1060 * If we're splitting the first (last) page on a level because we're
1061 * inserting (appending) a key to it, it's likely that the data is
1062 * sorted. Moving a single item to the new page is less work and can
1063 * push the fill factor higher than normal. This is trivial when we
1064 * are splitting a new page before the beginning of the tree, all of
1065 * the interesting tests are against values of 0.
1067 * Catching appends to the tree is harder. In a simple append, we're
1068 * inserting an item that sorts past the end of the tree; the cursor
1069 * will point past the last element on the page. But, in trees with
1070 * duplicates, the cursor may point to the last entry on the page --
1071 * in this case, the entry will also be the last element of a duplicate
1072 * set (the last because the search call specified the SR_DUPLAST flag).
1073 * The only way to differentiate between an insert immediately before
1074 * the last item in a tree or an append after a duplicate set which is
1075 * also the last item in the tree is to call the comparison function.
1076 * When splitting internal pages during an append, the search code
1077 * guarantees the cursor always points to the largest page item less
1078 * than the new internal entry. To summarize, we want to catch three
1079 * possible index values:
1081 * NUM_ENT(page) Btree/Recno leaf insert past end-of-tree
1082 * NUM_ENT(page) - O_INDX Btree or Recno internal insert past EOT
1083 * NUM_ENT(page) - P_INDX Btree leaf insert past EOT after a set
1086 * two of which, (NUM_ENT(page) - O_INDX or P_INDX) might be an insert
1087 * near the end of the tree, and not after the end of the tree at all.
1088 * Do a simple test which might be wrong because calling the comparison
1089 * functions is expensive. Regardless, it's not a big deal if we're
1090 * wrong, we'll do the split the right way next time.
1093 if (NEXT_PGNO(pp) == PGNO_INVALID && cp->indx >= NUM_ENT(pp) - adjust)
1094 off = NUM_ENT(pp) - adjust;
1095 else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
1101 * Split the data to the left and right pages. Try not to split on
1102 * an overflow key. (Overflow keys on internal pages will slow down
1103 * searches.) Refuse to split in the middle of a set of duplicates.
1105 * First, find the optimum place to split.
1107 * It's possible to try and split past the last record on the page if
1108 * there's a very large record at the end of the page. Make sure this
1109 * doesn't happen by bounding the check at the next-to-last entry on
1112 * Note, we try and split half the data present on the page. This is
1113 * because another process may have already split the page and left
1114 * it half empty. We don't try and skip the split -- we don't know
1115 * how much space we're going to need on the page, and we may need up
1116 * to half the page for a big item, so there's no easy test to decide
1117 * if we need to split or not. Besides, if two threads are inserting
1118 * data into the same place in the database, we're probably going to
1119 * need more space soon anyway.
1121 top = NUM_ENT(pp) - adjust;
1122 half = (dbp->pgsize - HOFFSET(pp)) / 2;
1123 for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
1127 GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA)
1128 nbytes += BINTERNAL_SIZE(
1129 GET_BINTERNAL(dbp, pp, off)->len);
1131 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
1134 if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1136 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
1139 nbytes += BOVERFLOW_SIZE;
1145 if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1147 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
1150 nbytes += BOVERFLOW_SIZE;
1153 nbytes += RINTERNAL_SIZE;
1156 return (__db_pgfmt(dbp->env, pp->pgno));
1161 * Splitp is either at or just past the optimum split point. If the
1162 * tree type is such that we're going to promote a key to an internal
1163 * page, and our current choice is an overflow key, look for something
1164 * close by that's smaller.
1170 B_TYPE(GET_BINTERNAL(dbp, pp, off)->type) != B_KEYDATA;
1175 isbigkey = B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) !=
1179 iflag = isbigkey = 0;
1182 for (cnt = 1; cnt <= 3; ++cnt) {
1183 off = splitp + cnt * adjust;
1184 if (off < (db_indx_t)NUM_ENT(pp) &&
1186 GET_BINTERNAL(dbp, pp,off)->type) == B_KEYDATA) ||
1187 B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1192 if (splitp <= (db_indx_t)(cnt * adjust))
1194 off = splitp - cnt * adjust;
1196 GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA :
1197 B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1205 * We can't split in the middle a set of duplicates. We know that
1206 * no duplicate set can take up more than about 25% of the page,
1207 * because that's the point where we push it off onto a duplicate
1208 * page set. So, this loop can't be unbounded.
1210 if (TYPE(pp) == P_LBTREE &&
1211 inp[splitp] == inp[splitp - adjust])
1212 for (cnt = 1;; ++cnt) {
1213 off = splitp + cnt * adjust;
1214 if (off < NUM_ENT(pp) &&
1215 inp[splitp] != inp[off]) {
1219 if (splitp <= (db_indx_t)(cnt * adjust))
1221 off = splitp - cnt * adjust;
1222 if (inp[splitp] != inp[off]) {
1223 splitp = off + adjust;
1228 /* We're going to split at splitp. */
1229 if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
1231 if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
1240 * Copy a set of records from one page to another.
1242 * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
1245 __bam_copy(dbp, pp, cp, nxt, stop)
1248 u_int32_t nxt, stop;
1251 db_indx_t *cinp, nbytes, off, *pinp;
1253 cinp = P_INP(dbp, cp);
1254 pinp = P_INP(dbp, pp);
1256 * Nxt is the offset of the next record to be placed on the target page.
1258 for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
1261 if (off == 0 && nxt != 0)
1262 nbytes = BINTERNAL_SIZE(0);
1264 GET_BINTERNAL(dbp, pp, nxt)->type) == B_KEYDATA)
1265 nbytes = BINTERNAL_SIZE(
1266 GET_BINTERNAL(dbp, pp, nxt)->len);
1268 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
1272 * If we're on a key and it's a duplicate, just copy
1275 if (off != 0 && (nxt % P_INDX) == 0 &&
1276 pinp[nxt] == pinp[nxt - P_INDX]) {
1277 cinp[off] = cinp[off - P_INDX];
1283 if (B_TYPE(GET_BKEYDATA(dbp, pp, nxt)->type) ==
1285 nbytes = BKEYDATA_SIZE(GET_BKEYDATA(dbp,
1288 nbytes = BOVERFLOW_SIZE;
1291 nbytes = RINTERNAL_SIZE;
1294 return (__db_pgfmt(dbp->env, pp->pgno));
1296 cinp[off] = HOFFSET(cp) -= nbytes;
1297 if (off == 0 && nxt != 0 && TYPE(pp) == P_IBTREE) {
1299 UMRW_SET(internal.unused);
1300 internal.type = B_KEYDATA;
1301 internal.pgno = GET_BINTERNAL(dbp, pp, nxt)->pgno;
1302 internal.nrecs = GET_BINTERNAL(dbp, pp, nxt)->nrecs;
1303 memcpy(P_ENTRY(dbp, cp, off), &internal, nbytes);
1306 memcpy(P_ENTRY(dbp, cp, off),
1307 P_ENTRY(dbp, pp, nxt), nbytes);