ITS#7505 Fix mdb_update_key when key is too big
authorHoward Chu <hyc@openldap.org>
Sun, 27 Jan 2013 17:55:21 +0000 (17:55 +0000)
committerHoward Chu <hyc@openldap.org>
Sun, 27 Jan 2013 18:02:18 +0000 (18:02 +0000)
libraries/liblmdb/mdb.c

index 575e494..061d752 100644 (file)
@@ -1009,7 +1009,7 @@ static size_t     mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data);
 static size_t  mdb_branch_size(MDB_env *env, MDB_val *key);
 
 static int     mdb_rebalance(MDB_cursor *mc);
-static int     mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key);
+static int     mdb_update_key(MDB_cursor *mc, MDB_val *key);
 
 static void    mdb_cursor_pop(MDB_cursor *mc);
 static int     mdb_cursor_push(MDB_cursor *mc, MDB_page *mp);
@@ -1120,17 +1120,22 @@ mdb_page_list(MDB_page *mp)
        DKBUF;
 
        nkeys = NUMKEYS(mp);
-       fprintf(stderr, "numkeys %d\n", nkeys);
+       fprintf(stderr, "Page %zu numkeys %d\n", mp->mp_pgno, nkeys);
        for (i=0; i<nkeys; i++) {
                node = NODEPTR(mp, i);
                key.mv_size = node->mn_ksize;
                key.mv_data = node->mn_data;
                nsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t);
-               if (F_ISSET(node->mn_flags, F_BIGDATA))
-                       nsize += sizeof(pgno_t);
-               else
-                       nsize += NODEDSZ(node);
-               fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key));
+               if (IS_BRANCH(mp)) {
+                       fprintf(stderr, "key %d: page %zu, %s\n", i, NODEPGNO(node),
+                               DKEY(&key));
+               } else {
+                       if (F_ISSET(node->mn_flags, F_BIGDATA))
+                               nsize += sizeof(pgno_t);
+                       else
+                               nsize += NODEDSZ(node);
+                       fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key));
+               }
        }
 }
 
@@ -5782,15 +5787,18 @@ mdb_cursor_dbi(MDB_cursor *mc)
  * @return 0 on success, non-zero on failure.
  */
 static int
-mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
+mdb_update_key(MDB_cursor *mc, MDB_val *key)
 {
+       MDB_page                *mp;
        MDB_node                *node;
        char                    *base;
        size_t                   len;
        int                      delta, delta0;
-       indx_t                   ptr, i, numkeys;
+       indx_t                   ptr, i, numkeys, indx;
        DKBUF;
 
+       indx = mc->mc_ki[mc->mc_top];
+       mp = mc->mc_pg[mc->mc_top];
        node = NODEPTR(mp, indx);
        ptr = mp->mp_ptrs[indx];
 #if MDB_DEBUG
@@ -5815,8 +5823,12 @@ mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
        delta += (delta & 1);
        if (delta) {
                if (delta > 0 && SIZELEFT(mp) < delta) {
-                       DPRINTF("OUCH! Not enough room, delta = %d", delta);
-                       return MDB_PAGE_FULL;
+                       pgno_t pgno;
+                       /* not enough space left, do a delete and split */
+                       DPRINTF("Not enough room, delta = %d, splitting...", delta);
+                       pgno = NODEPGNO(node);
+                       mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+                       return mdb_page_split(mc, key, NULL, pgno, MDB_SPLIT_REPLACE);
                }
 
                numkeys = NUMKEYS(mp);
@@ -5843,15 +5855,19 @@ mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
        return MDB_SUCCESS;
 }
 
+static void
+mdb_cursor_copy(const MDB_cursor *csrc, MDB_cursor *cdst);
+
 /** Move a node from csrc to cdst.
  */
 static int
 mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
 {
-       int                      rc;
        MDB_node                *srcnode;
        MDB_val          key, data;
        pgno_t  srcpg;
+       MDB_cursor mn;
+       int                      rc;
        unsigned short flags;
 
        DKBUF;
@@ -5912,7 +5928,11 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
                }
                cdst->mc_snum = snum--;
                cdst->mc_top = snum;
-               rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &bkey);
+               mdb_cursor_copy(cdst, &mn);
+               mn.mc_ki[snum] = 0;
+               rc = mdb_update_key(&mn, &bkey);
+               if (rc)
+                       return rc;
        }
 
        DPRINTF("moving %s node %u [%s] on page %zu to node %u on page %zu",
@@ -5968,14 +5988,19 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
                        }
                        DPRINTF("update separator for source page %zu to [%s]",
                                csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key));
-                       if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1],
-                               &key)) != MDB_SUCCESS)
+                       mdb_cursor_copy(csrc, &mn);
+                       mn.mc_snum--;
+                       mn.mc_top--;
+                       if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
                                return rc;
                }
                if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
                        MDB_val  nullkey;
+                       indx_t  ix = csrc->mc_ki[csrc->mc_top];
                        nullkey.mv_size = 0;
-                       rc = mdb_update_key(csrc->mc_pg[csrc->mc_top], 0, &nullkey);
+                       csrc->mc_ki[csrc->mc_top] = 0;
+                       rc = mdb_update_key(csrc, &nullkey);
+                       csrc->mc_ki[csrc->mc_top] = ix;
                        assert(rc == MDB_SUCCESS);
                }
        }
@@ -5991,14 +6016,19 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
                        }
                        DPRINTF("update separator for destination page %zu to [%s]",
                                cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key));
-                       if ((rc = mdb_update_key(cdst->mc_pg[cdst->mc_top-1], cdst->mc_ki[cdst->mc_top-1],
-                               &key)) != MDB_SUCCESS)
+                       mdb_cursor_copy(cdst, &mn);
+                       mn.mc_snum--;
+                       mn.mc_top--;
+                       if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
                                return rc;
                }
                if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) {
                        MDB_val  nullkey;
+                       indx_t  ix = cdst->mc_ki[cdst->mc_top];
                        nullkey.mv_size = 0;
-                       rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &nullkey);
+                       cdst->mc_ki[cdst->mc_top] = 0;
+                       rc = mdb_update_key(cdst, &nullkey);
+                       cdst->mc_ki[cdst->mc_top] = ix;
                        assert(rc == MDB_SUCCESS);
                }
        }
@@ -6083,7 +6113,10 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
        mdb_node_del(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], 0);
        if (csrc->mc_ki[csrc->mc_top-1] == 0) {
                key.mv_size = 0;
-               if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], 0, &key)) != MDB_SUCCESS)
+               csrc->mc_top--;
+               rc = mdb_update_key(csrc, &key);
+               csrc->mc_top++;
+               if (rc)
                        return rc;
        }
 
@@ -6287,14 +6320,14 @@ mdb_rebalance(MDB_cursor *mc)
         * Otherwise we should try to merge them.
         */
        if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
-               return mdb_node_move(&mn, mc);
-       else { /* FIXME: if (has_enough_room()) */
-               mc->mc_flags &= ~C_INITIALIZED;
+               rc = mdb_node_move(&mn, mc);
+       else {
                if (mc->mc_ki[ptop] == 0)
-                       return mdb_page_merge(&mn, mc);
+                       rc = mdb_page_merge(&mn, mc);
                else
-                       return mdb_page_merge(mc, &mn);
+                       rc = mdb_page_merge(mc, &mn);
        }
+       return rc;
 }
 
 /** Complete a delete operation started by #mdb_cursor_del(). */
@@ -6320,6 +6353,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf)
        mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], mc->mc_db->md_pad);
        mc->mc_db->md_entries--;
        rc = mdb_rebalance(mc);
+       mc->mc_flags &= ~C_INITIALIZED;
        if (rc != MDB_SUCCESS)
                mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
 
@@ -6364,8 +6398,20 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                xdata = NULL;
        }
        rc = mdb_cursor_set(&mc, key, xdata, op, &exact);
-       if (rc == 0)
+       if (rc == 0) {
+               /* let mdb_page_split know about this cursor if needed:
+                * delete will trigger a rebalance; if it needs to move
+                * a node from one page to another, it will have to
+                * update the parent's separator key(s). If the new sepkey
+                * is larger than the current one, the parent page may
+                * run out of space, triggering a split. We need this
+                * cursor to be consistent until the end of the rebalance.
+                */
+               mc.mc_next = txn->mt_cursors[dbi];
+               txn->mt_cursors[dbi] = &mc;
                rc = mdb_cursor_del(&mc, data ? 0 : MDB_NODUPDATA);
+               txn->mt_cursors[dbi] = mc.mc_next;
+       }
        return rc;
 }