Sorted dups fix
authorHoward Chu <hyc@symas.com>
Fri, 19 Aug 2011 20:10:51 +0000 (13:10 -0700)
committerHoward Chu <hyc@symas.com>
Thu, 1 Sep 2011 23:31:10 +0000 (16:31 -0700)
Write the first datum normally. Convert to a sub-db if
additional items for the same key are written.

libraries/libmdb/mdb.c

index 0ca8ffd8bda25d2878cc2e2ce35d2d91c818d239..69da48fbe350a8a849558e9a9442e4b1f334ff69 100644 (file)
@@ -262,6 +262,7 @@ typedef struct MDB_node {
        unsigned int    mn_ksize:12;                    /* key size */
 #define F_BIGDATA       0x01                   /* data put on overflow page */
 #define F_SUBDATA       0x02                   /* data is a sub-database */
+#define F_DUPDATA       0x04                   /* data has duplicates */
        char            mn_data[1];
 } MDB_node;
 
@@ -1763,7 +1764,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
        leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL);
        if (leaf && exact) {
                /* Return first duplicate data item */
-               if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        MDB_xcursor mx;
 
                        mdb_xcursor_init0(txn, dbi, &mx);
@@ -1854,17 +1855,18 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
 
        assert(cursor->mc_initialized);
 
+       top = CURSOR_TOP(cursor);
+       mp = top->mp_page;
+
        if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-               if (op == MDB_NEXT || op == MDB_NEXT_DUP) {
+               leaf = NODEPTR(mp, top->mp_ki);
+               if ((op == MDB_NEXT || op == MDB_NEXT_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT);
                        if (op != MDB_NEXT || rc == MDB_SUCCESS)
                                return rc;
                }
        }
 
-       top = CURSOR_TOP(cursor);
-       mp = top->mp_page;
-
        DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
 
        if (top->mp_ki + 1 >= NUMKEYS(mp)) {
@@ -1885,12 +1887,14 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
        assert(IS_LEAF(mp));
        leaf = NODEPTR(mp, top->mp_ki);
 
+       if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+               mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+       }
        if (data) {
                if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
                        return rc;
 
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc != MDB_SUCCESS)
                                return rc;
@@ -1910,17 +1914,18 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
 
        assert(cursor->mc_initialized);
 
+       top = CURSOR_TOP(cursor);
+       mp = top->mp_page;
+
        if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-               if (op == MDB_PREV || op == MDB_PREV_DUP) {
+               leaf = NODEPTR(mp, top->mp_ki);
+               if ((op == MDB_PREV || op == MDB_PREV_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
                        if (op != MDB_PREV || rc == MDB_SUCCESS)
                                return rc;
                }
        }
 
-       top = CURSOR_TOP(cursor);
-       mp = top->mp_page;
-
        DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
 
        if (top->mp_ki == 0)  {
@@ -1943,12 +1948,14 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
        assert(IS_LEAF(mp));
        leaf = NODEPTR(mp, top->mp_ki);
 
+       if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+               mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+       }
        if (data) {
                if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
                        return rc;
 
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc != MDB_SUCCESS)
                                return rc;
@@ -1999,13 +2006,11 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        cursor->mc_initialized = 1;
        cursor->mc_eof = 0;
 
+       if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+               mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+       }
        if (data) {
-               MDB_val d2;
-               if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS)
-                       return rc;
-
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        if (op == MDB_SET || op == MDB_SET_RANGE) {
                                rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        } else {
@@ -2023,9 +2028,9 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
                                        return rc;
                        }
                } else {
-                       *data = d2;
+                       if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+                               return rc;
                }
-
        }
 
        rc = mdb_set_key(leaf, key);
@@ -2057,14 +2062,14 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        cursor->mc_eof = 0;
 
        if (data) {
-               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
-                       return rc;
-
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
                        rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc)
                                return rc;
+               } else {
+                       if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+                               return rc;
                }
        }
        return mdb_set_key(leaf, key);
@@ -2097,14 +2102,14 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        top->mp_ki = NUMKEYS(top->mp_page) - 1;
 
        if (data) {
-               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
-                       return rc;
-
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
                        rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc)
                                return rc;
+               } else {
+                       if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+                               return rc;
                }
        }
 
@@ -2451,16 +2456,25 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
 int
 mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
 {
+       MDB_ppage       *top;
+       MDB_node        *leaf;
+
        if (mc == NULL || countp == NULL)
                return EINVAL;
 
        if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT))
                return EINVAL;
 
-       if (!mc->mc_xcursor->mx_cursor.mc_initialized)
-               return EINVAL;
+       top = CURSOR_TOP(mc);
+       leaf = NODEPTR(top->mp_page, top->mp_ki);
+       if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+               *countp = 1;
+       } else {
+               if (!mc->mc_xcursor->mx_cursor.mc_initialized)
+                       return EINVAL;
 
-       *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+               *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+       }
        return MDB_SUCCESS;
 }
 
@@ -2805,7 +2819,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                return MDB_NOTFOUND;
        }
 
-       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+       if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                MDB_xcursor mx;
                MDB_pageparent mp2;
 
@@ -3027,26 +3041,46 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
        unsigned int     ki;
        MDB_node        *leaf;
        MDB_pageparent  mpp;
-       MDB_val xdata, *rdata;
+       MDB_val xdata, *rdata, dkey;
        MDB_db dummy;
+       char dbuf[PAGESIZE];
+       int do_sub = 0;
 
        DPRINTF("==> put key %.*s, size %zu, data size %zu",
                (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size);
 
+       dkey.mv_size = 0;
        mpp.mp_parent = NULL;
        mpp.mp_pi = 0;
        rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp);
        if (rc == MDB_SUCCESS) {
                leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
                if (leaf && exact) {
-                       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
-                               goto put_sub;
-                       }
                        if (flags == MDB_NOOVERWRITE) {
                                DPRINTF("duplicate key %.*s",
                                    (int)key->mv_size, (char *)key->mv_data);
                                return MDB_KEYEXIST;
                        }
+                       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                               /* Was a single item before, must convert now */
+                               if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+                                       dkey.mv_size = NODEDSZ(leaf);
+                                       memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
+                                       memset(&dummy, 0, sizeof(dummy));
+                                       dummy.md_root = P_INVALID;
+                                       if (dkey.mv_size == sizeof(MDB_db)) {
+                                               memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
+                                               goto put_sub;
+                                       }
+                                       mdb_del_node(mpp.mp_page, ki);
+                                       do_sub = 1;
+                                       rdata = &xdata;
+                                       xdata.mv_size = sizeof(MDB_db);
+                                       xdata.mv_data = &dummy;
+                                       goto new_sub;
+                               }
+                               goto put_sub;
+                       }
                        /* same size, just replace it */
                        if (NODEDSZ(leaf) == data->mv_size) {
                                memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
@@ -3078,20 +3112,9 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
        DPRINTF("there are %u keys, should insert new key at index %i",
                NUMKEYS(mpp.mp_page), ki);
 
-       /* For sorted dups, the data item at this level is a DB record
-        * for a child DB; the actual data elements are stored as keys
-        * in the child DB.
-        */
-       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
-               rdata = &xdata;
-               xdata.mv_size = sizeof(MDB_db);
-               xdata.mv_data = &dummy;
-               memset(&dummy, 0, sizeof(dummy));
-               dummy.md_root = P_INVALID;
-       } else {
-               rdata = data;
-       }
+       rdata = data;
 
+new_sub:
        if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) {
                rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID);
        } else {
@@ -3115,7 +3138,7 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
                 * size limits on dupdata. The actual data fields of the child
                 * DB are all zero size.
                 */
-               if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+               if (do_sub) {
                        MDB_xcursor mx;
 
                        leaf = NODEPTR(mpp.mp_page, ki);
@@ -3126,6 +3149,13 @@ put_sub:
                        xdata.mv_data = "";
                        if (flags == MDB_NODUPDATA)
                                flags = MDB_NOOVERWRITE;
+                       /* converted, write the original data first */
+                       if (dkey.mv_size) {
+                               dkey.mv_data = dbuf;
+                               rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags);
+                               if (rc) return rc;
+                               leaf->mn_flags |= F_DUPDATA;
+                       }
                        rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags);
                        mdb_xcursor_fini(txn, dbi, &mx);
                        memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],