Fix mdb_split, tweak split_indx if space is tight
authorHoward Chu <hyc@symas.com>
Sat, 27 Aug 2011 23:53:58 +0000 (16:53 -0700)
committerHoward Chu <hyc@symas.com>
Thu, 1 Sep 2011 23:31:10 +0000 (16:31 -0700)
libraries/libmdb/mdb.c

index 8937a8e9fd98f69eb2c34e9cbb58d9df3e6190ab..d1efdd1b24d4cad1ba86d49eb8def9c13b033cdf 100644 (file)
@@ -2502,10 +2502,10 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx,
 
        assert(mp->mp_upper >= mp->mp_lower);
 
-       DPRINTF("add node [%s] to %s page %lu at index %i, key size %zu",
-           key ? DKEY(key) : NULL,
+       DPRINTF("add to %s page %lu index %i, data size %zu key size %zu [%s]",
            IS_LEAF(mp) ? "leaf" : "branch",
-           mp->mp_pgno, indx, key ? key->mv_size : 0);
+           mp->mp_pgno, indx, data ? data->mv_size : 0,
+               key ? key->mv_size : 0, key ? DKEY(key) : NULL);
 
        if (IS_LEAF2(mp)) {
                /* Move higher keys up one slot. */
@@ -3213,7 +3213,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
        int              rc = MDB_SUCCESS, ins_new = 0;
        indx_t           newindx;
        pgno_t           pgno = 0;
-       unsigned int     i, j, split_indx;
+       unsigned int     i, j, split_indx, nkeys, pmax;
        MDB_node        *node;
        MDB_val  sepkey, rkey, rdata;
        MDB_page        *copy;
@@ -3255,12 +3255,13 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
        rdp->h.md_pi = mdp->h.md_pi + 1;
        DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno);
 
-       split_indx = NUMKEYS(&mdp->p) / 2 + 1;
+       nkeys = NUMKEYS(&mdp->p);
+       split_indx = nkeys / 2 + 1;
 
        if (IS_LEAF2(&rdp->p)) {
                char *split, *ins;
                int x;
-               unsigned int nkeys = NUMKEYS(&mdp->p), lsize, rsize, ksize;
+               unsigned int lsize, rsize, ksize;
                /* Move half of the keys to the right sibling */
                copy = NULL;
                x = *newindxp - split_indx;
@@ -3307,6 +3308,46 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
        memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_psize - PAGEHDRSZ);
        mdp->p.mp_lower = PAGEHDRSZ;
        mdp->p.mp_upper = txn->mt_env->me_psize;
+       /* For leaf pages, check the split point based on what
+        * fits where, since otherwise add_node can fail.
+        */
+       if (IS_LEAF(&mdp->p)) {
+               unsigned int psize, nsize;
+               /* Maximum free space in an empty page */
+               pmax = txn->mt_env->me_psize - PAGEHDRSZ;
+               nsize = mdb_leaf_size(txn->mt_env, newkey, newdata);
+               if (newindx <= split_indx) {
+split1:
+                       psize = nsize;
+                       for (i=0; i<split_indx; i++) {
+                               node = NODEPTR(&mdp->p, i);
+                               psize += NODESIZE + NODEKSZ(node);
+                               if (F_ISSET(node->mn_flags, F_BIGDATA))
+                                       psize += sizeof(pgno_t);
+                               else
+                                       psize += NODEDSZ(node);
+                               if (psize > pmax) {
+                                       split_indx--;
+                                       goto split1;
+                               }
+                       }
+               } else {
+split2:
+                       psize = nsize;
+                       for (i=split_indx; i<nkeys; i++) {
+                               node = NODEPTR(&mdp->p, i);
+                               psize += NODESIZE + NODEKSZ(node);
+                               if (F_ISSET(node->mn_flags, F_BIGDATA))
+                                       psize += sizeof(pgno_t);
+                               else
+                                       psize += NODEDSZ(node);
+                               if (psize > pmax) {
+                                       split_indx++;
+                                       goto split2;
+                               }
+                       }
+               }
+       }
 
        /* First find the separating key between the split pages.
         */