Append tweaks, page_split fixes
authorHoward Chu <hyc@symas.com>
Sat, 21 Jul 2012 13:10:02 +0000 (06:10 -0700)
committerHoward Chu <hyc@symas.com>
Sat, 21 Jul 2012 13:19:09 +0000 (06:19 -0700)
Append mode now does no key comparisons, input must be in sorted order.
page_split was not updating cursor parents correctly.

libraries/libmdb/mdb.c
libraries/libmdb/mdb.h

index 4588e75..ad9ff44 100644 (file)
@@ -843,6 +843,7 @@ struct MDB_cursor {
 #define C_SUB  0x04                    /**< Cursor is a sub-cursor */
 #define C_SHADOW       0x08            /**< Cursor is a dup from a parent txn */
 #define C_ALLOCD       0x10            /**< Cursor was malloc'd */
+#define C_SPLITTING    0x20            /**< Cursor is in page_split */
 /** @} */
        unsigned int    mc_flags;       /**< @ref mdb_cursor */
        MDB_page        *mc_pg[CURSOR_STACK];   /**< stack of pushed pages */
@@ -937,6 +938,8 @@ static int  mdb_page_search_root(MDB_cursor *mc,
 static int  mdb_page_search(MDB_cursor *mc,
                            MDB_val *key, int flags);
 static int     mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst);
+
+#define MDB_SPLIT_REPLACE      MDB_APPENDDUP   /**< newkey is not new */
 static int     mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
                                pgno_t newpgno, unsigned int nflags);
 
@@ -1055,13 +1058,31 @@ mdb_page_keys(MDB_page *mp)
        DKBUF;
 
        nkeys = NUMKEYS(mp);
-       DPRINTF("numkeys %d", nkeys);
+       fprintf(stderr, "numkeys %d\n", nkeys);
        for (i=0; i<nkeys; i++) {
                node = NODEPTR(mp, i);
                key.mv_size = node->mn_ksize;
                key.mv_data = node->mn_data;
-               DPRINTF("key %d: %s", i, DKEY(&key));
+               fprintf(stderr, "key %d: %s\n", i, DKEY(&key));
+       }
+}
+
+void
+mdb_cursor_chk(MDB_cursor *mc)
+{
+       unsigned int i;
+       MDB_node *node;
+       MDB_page *mp;
+
+       if (!mc->mc_snum) return;
+       for (i=0; i<mc->mc_top; i++) {
+               mp = mc->mc_pg[i];
+               node = NODEPTR(mp, mc->mc_ki[i]);
+               if (NODEPGNO(node) != mc->mc_pg[i+1]->mp_pgno)
+                       printf("oops!\n");
        }
+       if (mc->mc_ki[i] >= NUMKEYS(mc->mc_pg[i]))
+               printf("ack!\n");
 }
 #endif
 
@@ -3989,23 +4010,24 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data)
 {
        int              rc;
        MDB_node        *leaf;
-       MDB_val lkey;
 
-       lkey.mv_size = MAXKEYSIZE+1;
-       lkey.mv_data = NULL;
+       if (!(mc->mc_flags & C_EOF)) {
 
        if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
+               MDB_val lkey;
+
+               lkey.mv_size = MAXKEYSIZE+1;
+               lkey.mv_data = NULL;
                rc = mdb_page_search(mc, &lkey, 0);
                if (rc != MDB_SUCCESS)
                        return rc;
        }
        assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
 
-       leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
-       mc->mc_flags |= C_INITIALIZED;
-       mc->mc_flags &= ~C_EOF;
-
        mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]) - 1;
+       mc->mc_flags |= C_INITIALIZED|C_EOF;
+       }
+       leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
 
        if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
                key->mv_size = mc->mc_db->md_pad;
@@ -4104,9 +4126,10 @@ fetchm:
        case MDB_PREV:
        case MDB_PREV_DUP:
        case MDB_PREV_NODUP:
-               if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF))
+               if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF)) {
                        rc = mdb_cursor_last(mc, key, data);
-               else
+                       mc->mc_flags &= ~C_EOF;
+               } else
                        rc = mdb_cursor_prev(mc, key, data, op);
                break;
        case MDB_FIRST:
@@ -4178,7 +4201,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        MDB_val xdata, *rdata, dkey;
        MDB_page        *fp;
        MDB_db dummy;
-       int do_sub = 0;
+       int do_sub = 0, insert = 0;
        unsigned int mcount = 0;
        size_t nsize;
        int rc, rc2;
@@ -4220,7 +4243,16 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        } else {
                int exact = 0;
                MDB_val d2;
+               if (flags & MDB_APPEND) {
+                       MDB_val k2;
+                       rc = mdb_cursor_last(mc, &k2, &d2);
+                       if (rc == 0) {
+                               rc = MDB_NOTFOUND;
+                               mc->mc_ki[mc->mc_top]++;
+                       }
+               } else {
                rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact);
+               }
                if ((flags & MDB_NOOVERWRITE) && rc == 0) {
                        DPRINTF("duplicate key [%s]", DKEY(key));
                        *data = d2;
@@ -4419,6 +4451,7 @@ current:
                mc->mc_db->md_entries--;
        } else {
                DPRINTF("inserting key at index %i", mc->mc_ki[mc->mc_top]);
+               insert = 1;
        }
 
        rdata = data;
@@ -4429,11 +4462,13 @@ new_sub:
        if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
                if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA )
                        nflags &= ~MDB_APPEND;
+               if (!insert)
+                       nflags |= MDB_SPLIT_REPLACE;
                rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
        } else {
                /* There is room already in this leaf page. */
                rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
-               if (rc == 0 && !do_sub) {
+               if (rc == 0 && !do_sub && insert) {
                        /* Adjust other cursors pointing to mp */
                        MDB_cursor *m2, *m3;
                        MDB_dbi dbi = mc->mc_dbi;
@@ -4495,7 +4530,8 @@ put_sub:
                                        }
                                }
                        }
-                       xflags |= (flags & MDB_APPEND);
+                       if (flags & MDB_APPENDDUP)
+                               xflags |= MDB_APPEND;
                        rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
                        if (flags & F_SUBDATA) {
                                void *db = NODEDATA(leaf);
@@ -5760,6 +5796,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                DPRINTF("parent branch page is %zu", mc->mc_pg[ptop]->mp_pgno);
        }
 
+       mc->mc_flags |= C_SPLITTING;
        mdb_cursor_copy(mc, &mn);
        mn.mc_pg[mn.mc_top] = rp;
        mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;
@@ -5767,8 +5804,8 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
        if (nflags & MDB_APPEND) {
                mn.mc_ki[mn.mc_top] = 0;
                sepkey = *newkey;
+               split_indx = newindx;
                nkeys = 0;
-               split_indx = 0;
                goto newsep;
        }
 
@@ -5900,14 +5937,32 @@ newsep:
                 */
                if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
                    mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
-                       mc->mc_pg[ptop] = mn.mc_pg[ptop];
-                       mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
+                       /* root split? */
+                       if (mn.mc_snum == mc->mc_snum) {
+                               mc->mc_pg[mc->mc_snum] = mc->mc_pg[mc->mc_top];
+                               mc->mc_ki[mc->mc_snum] = mc->mc_ki[mc->mc_top];
+                               mc->mc_ki[mc->mc_top] = mn.mc_ki[mc->mc_top] - 1;
+                               mc->mc_pg[mc->mc_top] = mn.mc_pg[mc->mc_top];
+                               for (i=0; i<mc->mc_top; i++) {
+                                       mc->mc_pg[i] = mn.mc_pg[i];
+                                       mc->mc_ki[i] = mn.mc_ki[i];
+                               }
+                               mc->mc_snum++;
+                               mc->mc_top++;
+                               ptop++;
+                       } else {
+                               for (i=0; i<ptop; i++)
+                                       mc->mc_ki[i] = mn.mc_ki[i];
+                               mc->mc_pg[ptop] = mn.mc_pg[ptop];
+                               mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
+                       }
                }
        } else {
                mn.mc_top--;
                rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
                mn.mc_top++;
        }
+       mc->mc_flags ^= C_SPLITTING;
        if (rc != MDB_SUCCESS) {
                return rc;
        }
@@ -5917,6 +5972,8 @@ newsep:
                rc = mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
                if (rc)
                        return rc;
+               for (i=0; i<mc->mc_top; i++)
+                       mc->mc_ki[i] = mn.mc_ki[i];
                goto done;
        }
        if (IS_LEAF2(rp)) {
@@ -6001,6 +6058,8 @@ newsep:
                        if (!(node->mn_flags & F_BIGDATA))
                                newdata->mv_data = NODEDATA(node);
                }
+       } else {
+               mc->mc_ki[ptop]++;
        }
 
        /* return tmp page to freelist */
@@ -6024,6 +6083,8 @@ done:
                                m3 = m2;
                        if (!(m3->mc_flags & C_INITIALIZED))
                                continue;
+                       if (m3->mc_flags & C_SPLITTING)
+                               continue;
                        if (new_root) {
                                int k;
                                /* root split */
@@ -6031,16 +6092,28 @@ done:
                                        m3->mc_ki[k+1] = m3->mc_ki[k];
                                        m3->mc_pg[k+1] = m3->mc_pg[k];
                                }
-                               m3->mc_ki[0] = mc->mc_ki[0];
+                               if (m3->mc_ki[0] >= split_indx) {
+                                       m3->mc_ki[0] = 1;
+                               } else {
+                                       m3->mc_ki[0] = 0;
+                               }
                                m3->mc_pg[0] = mc->mc_pg[0];
                                m3->mc_snum++;
                                m3->mc_top++;
                        }
                        if (m3->mc_pg[mc->mc_top] == mp) {
-                               if (m3->mc_ki[m3->mc_top] >= split_indx) {
-                                       m3->mc_pg[m3->mc_top] = rp;
-                                       m3->mc_ki[m3->mc_top] -= split_indx;
+                               if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE))
+                                       m3->mc_ki[mc->mc_top]++;
+                               if (m3->mc_ki[mc->mc_top] >= split_indx) {
+                                       m3->mc_pg[mc->mc_top] = rp;
+                                       m3->mc_ki[mc->mc_top] -= split_indx;
+                                       if ((nflags & MDB_SPLIT_REPLACE) && !newpos)
+                                               m3->mc_ki[mc->mc_top]--;
+                                       m3->mc_ki[ptop] = mn.mc_ki[ptop];
                                }
+                       } else if (m3->mc_pg[ptop] == mc->mc_pg[ptop] &&
+                               m3->mc_ki[ptop] >= mc->mc_ki[ptop]) {
+                               m3->mc_ki[ptop]++;
                        }
                }
        }
index 587013f..21b84a5 100644 (file)
@@ -201,8 +201,10 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel
 #define MDB_RESERVE    0x10000
 /** Data is being appended, don't split full pages. */
 #define MDB_APPEND     0x20000
+/** Duplicate data is being appended, don't split full pages. */
+#define MDB_APPENDDUP  0x40000
 /** Store multiple data items in one call. */
-#define MDB_MULTIPLE   0x40000
+#define MDB_MULTIPLE   0x80000
 /*     @} */
 
 /** @brief Cursor Get operations.
@@ -806,6 +808,16 @@ int  mdb_get(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data);
         *              #MDB_KEYEXIST if the key already appears in the database, even if
         *              the database supports duplicates (#MDB_DUPSORT). The \b data
         *              parameter will be set to point to the existing item.
+        *      <li>#MDB_RESERVE - reserve space for data of the given size, but
+        *              don't copy the given data. Instead, return a pointer to the
+        *              reserved space, which the caller can fill in later. This saves
+        *              an extra memcpy if the data is being generated later.
+        *      <li>#MDB_APPEND - append the given key/data pair to the end of the
+        *              database. No key comparisons are performed. This option allows
+        *              fast bulk loading when keys are already known to be in the
+        *              correct order. Loading unsorted keys with this flag will cause
+        *              data corruption.
+        *      <li>#MDB_APPENDDUP - as above, but for sorted dup data.
         * </ul>
         * @return A non-zero error value on failure and 0 on success. Some possible
         * errors are:
@@ -921,6 +933,16 @@ int  mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
         *              does not already appear in the database. The function will return
         *              #MDB_KEYEXIST if the key already appears in the database, even if
         *              the database supports duplicates (#MDB_DUPSORT).
+        *      <li>#MDB_RESERVE - reserve space for data of the given size, but
+        *              don't copy the given data. Instead, return a pointer to the
+        *              reserved space, which the caller can fill in later. This saves
+        *              an extra memcpy if the data is being generated later.
+        *      <li>#MDB_APPEND - append the given key/data pair to the end of the
+        *              database. No key comparisons are performed. This option allows
+        *              fast bulk loading when keys are already known to be in the
+        *              correct order. Loading unsorted keys with this flag will cause
+        *              data corruption.
+        *      <li>#MDB_APPENDDUP - as above, but for sorted dup data.
         * </ul>
         * @return A non-zero error value on failure and 0 on success. Some possible
         * errors are: