Add MDB_RESERVE mode
authorHoward Chu <hyc@symas.com>
Sun, 2 Oct 2011 17:02:30 +0000 (10:02 -0700)
committerHoward Chu <hyc@symas.com>
Sun, 2 Oct 2011 17:02:30 +0000 (10:02 -0700)
When putting a record, just make space for the data, don't copy it.
(Not compatible with MDB_DUPSORT, since the actual data is needed
to determine the insert location.)

libraries/libmdb/mdb.c
libraries/libmdb/mdb.h

index 7b290ca..c6bc4f9 100644 (file)
@@ -598,6 +598,10 @@ typedef struct MDB_node {
 #define F_BIGDATA       0x01                   /**< data put on overflow page */
 #define F_SUBDATA       0x02                   /**< data is a sub-database */
 #define F_DUPDATA       0x04                   /**< data has duplicates */
+
+/** valid flags for #mdb_node_add() */
+#define        NODE_ADD_FLAGS  (F_DUPDATA|F_SUBDATA|MDB_RESERVE)
+
 /** @} */
        unsigned short  mn_flags;               /**< @ref mdb_node */
        unsigned short  mn_ksize;               /**< key size */
@@ -886,7 +890,7 @@ static int  mdb_page_search(MDB_cursor *mc,
                            MDB_val *key, int modify);
 static int     mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst);
 static int     mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
-                               pgno_t newpgno);
+                               pgno_t newpgno, unsigned int nflags);
 
 static int  mdb_env_read_header(MDB_env *env, MDB_meta *meta);
 static int  mdb_env_read_meta(MDB_env *env, int *which);
@@ -894,7 +898,7 @@ static int  mdb_env_write_meta(MDB_txn *txn);
 
 static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp);
 static int  mdb_node_add(MDB_cursor *mc, indx_t indx,
-                           MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags);
+                           MDB_val *key, MDB_val *data, pgno_t pgno, unsigned int flags);
 static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize);
 static void mdb_node_shrink(MDB_page *mp, indx_t indx);
 static int     mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst);
@@ -3875,6 +3879,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        int rc, rc2;
        char pbuf[PAGESIZE];
        char dbuf[MAXKEYSIZE+1];
+       unsigned int nflags;
        DKBUF;
 
        if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
@@ -4068,12 +4073,13 @@ current:
        rdata = data;
 
 new_sub:
+       nflags = flags & NODE_ADD_FLAGS;
        nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(mc->mc_txn->mt_env, key, rdata);
        if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
-               rc = mdb_page_split(mc, key, rdata, P_INVALID);
+               rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
        } else {
                /* There is room already in this leaf page. */
-               rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, 0);
+               rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
                if (rc == 0 && !do_sub) {
                        /* Adjust other cursors pointing to mp */
                        MDB_cursor *m2, *m3;
@@ -4100,12 +4106,6 @@ new_sub:
        if (rc != MDB_SUCCESS)
                mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
        else {
-               /* Remember if we just added a subdatabase */
-               if (flags & (F_SUBDATA|F_DUPDATA)) {
-                       leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
-                       leaf->mn_flags |= (flags & (F_SUBDATA|F_DUPDATA));
-               }
-
                /* Now store the actual data in the child DB. Note that we're
                 * storing the user data in the keys field, so there are strict
                 * size limits on dupdata. The actual data fields of the child
@@ -4117,6 +4117,7 @@ new_sub:
 put_sub:
                        xdata.mv_size = 0;
                        xdata.mv_data = "";
+                       leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
                        if (flags & MDB_CURRENT) {
                                xflags = MDB_CURRENT;
                        } else {
@@ -4311,7 +4312,7 @@ mdb_branch_size(MDB_env *env, MDB_val *key)
  */
 static int
 mdb_node_add(MDB_cursor *mc, indx_t indx,
-    MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags)
+    MDB_val *key, MDB_val *data, pgno_t pgno, unsigned int flags)
 {
        unsigned int     i;
        size_t           node_size = NODESIZE;
@@ -4407,13 +4408,18 @@ mdb_node_add(MDB_cursor *mc, indx_t indx,
                        if (F_ISSET(flags, F_BIGDATA))
                                memcpy(node->mn_data + key->mv_size, data->mv_data,
                                    sizeof(pgno_t));
+                       else if (F_ISSET(flags, MDB_RESERVE))
+                               data->mv_data = node->mn_data + key->mv_size;
                        else
                                memcpy(node->mn_data + key->mv_size, data->mv_data,
                                    data->mv_size);
                } else {
                        memcpy(node->mn_data + key->mv_size, &ofp->mp_pgno,
                            sizeof(pgno_t));
-                       memcpy(METADATA(ofp), data->mv_data, data->mv_size);
+                       if (F_ISSET(flags, MDB_RESERVE))
+                               data->mv_data = METADATA(ofp);
+                       else
+                               memcpy(METADATA(ofp), data->mv_data, data->mv_size);
                }
        }
 
@@ -5219,15 +5225,16 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
  * @return 0 on success, non-zero on failure.
  */
 static int
-mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
+mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno,
+       unsigned int nflags)
 {
-       uint8_t          flags;
+       unsigned int flags;
        int              rc = MDB_SUCCESS, ins_new = 0, new_root = 0;
        indx_t           newindx;
        pgno_t           pgno = 0;
        unsigned int     i, j, split_indx, nkeys, pmax;
        MDB_node        *node;
-       MDB_val  sepkey, rkey, rdata;
+       MDB_val  sepkey, rkey, xdata, *rdata = &xdata;
        MDB_page        *copy;
        MDB_page        *mp, *rp, *pp;
        unsigned int ptop;
@@ -5385,7 +5392,7 @@ newsep:
        if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) {
                mn.mc_snum--;
                mn.mc_top--;
-               rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno);
+               rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno, 0);
 
                /* Right page might now have changed parent.
                 * Check if left page also changed parent.
@@ -5432,11 +5439,10 @@ newsep:
                        rkey.mv_data = newkey->mv_data;
                        rkey.mv_size = newkey->mv_size;
                        if (IS_LEAF(mp)) {
-                               rdata.mv_data = newdata->mv_data;
-                               rdata.mv_size = newdata->mv_size;
+                               rdata = newdata;
                        } else
                                pgno = newpgno;
-                       flags = 0;
+                       flags = nflags;
 
                        ins_new = 1;
 
@@ -5449,8 +5455,9 @@ newsep:
                        rkey.mv_data = NODEKEY(node);
                        rkey.mv_size = node->mn_ksize;
                        if (IS_LEAF(mp)) {
-                               rdata.mv_data = NODEDATA(node);
-                               rdata.mv_size = NODEDSZ(node);
+                               xdata.mv_data = NODEDATA(node);
+                               xdata.mv_size = NODEDSZ(node);
+                               rdata = &xdata;
                        } else
                                pgno = NODEPGNO(node);
                        flags = node->mn_flags;
@@ -5463,7 +5470,7 @@ newsep:
                        rkey.mv_size = 0;
                }
 
-               rc = mdb_node_add(mc, j, &rkey, &rdata, pgno, flags);
+               rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags);
        }
 
        /* reset back to original page */
@@ -5539,7 +5546,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
                return EINVAL;
        }
 
-       if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA)) != flags)
+       if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE)) != flags)
                return EINVAL;
 
        mdb_cursor_init(&mc, txn, dbi, &mx);
index af82e56..3bb7e4c 100644 (file)
@@ -189,6 +189,10 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel
 #define MDB_NODUPDATA  0x20
 /** For mdb_cursor_put: overwrite the current key/data pair */
 #define MDB_CURRENT    0x40
+/** For put: Just reserve space for data, don't copy it. Return a
+ * pointer to the reserved space.
+ */
+#define MDB_RESERVE    0x10000
 /*     @} */
 
 /** @brief Cursor Get operations.