Track changes to all cursors.
authorHoward Chu <hyc@symas.com>
Wed, 21 Sep 2011 21:14:11 +0000 (14:14 -0700)
committerHoward Chu <hyc@symas.com>
Wed, 21 Sep 2011 22:55:27 +0000 (15:55 -0700)
For any change to a page or node, update all other cursors pointing
at the same page (or node). Cursors are now stored in a linked list
off their owning transaction. Cursors are all closed when the transaction
ends. Cursors in parent transactions are updated when their child
transaction commits.

libraries/libmdb/mdb.c
libraries/libmdb/mdb.h

index 143b5a1..28d5001 100644 (file)
@@ -731,6 +731,8 @@ struct MDB_txn {
 #define DB_DIRTY       0x01            /**< DB was written in this txn */
 #define DB_STALE       0x02            /**< DB record is older than txnID */
 /** @} */
+       /** Array of cursors for each DB */
+       MDB_cursor      **mt_cursors;
        /** Array of flags for each DB */
        unsigned char   *mt_dbflags;
        /**     Number of DB records in use. This number only ever increments;
@@ -762,6 +764,10 @@ struct MDB_xcursor;
 
        /** Cursors are used for all DB operations */
 struct MDB_cursor {
+       /** Next cursor on this DB in this txn */
+       MDB_cursor      *mc_next;
+       /** Original cursor if this is a shadow */
+       MDB_cursor      *mc_orig;
        /** Context used for databases with #MDB_DUPSORT, otherwise NULL */
        struct MDB_xcursor      *mc_xcursor;
        /** The transaction that owns this cursor */
@@ -783,6 +789,8 @@ struct MDB_cursor {
  */
 #define C_INITIALIZED  0x01    /**< cursor has been initialized and is valid */
 #define C_EOF  0x02                    /**< No more data */
+#define C_SUB  0x04                    /**< Cursor is a sub-cursor */
+#define C_SHADOW       0x08                    /**< Cursor is a dup from a parent txn */
 /** @} */
        unsigned int    mc_flags;       /**< @ref mdb_cursor */
        MDB_page        *mc_pg[CURSOR_STACK];   /**< stack of pushed pages */
@@ -1160,6 +1168,26 @@ mdb_page_touch(MDB_cursor *mc)
                mp->mp_flags |= P_DIRTY;
 
 finish:
+               /* Adjust other cursors pointing to mp */
+               if (mc->mc_flags & C_SUB) {
+                       MDB_cursor *m2, *m3;
+                       MDB_dbi dbi = mc->mc_dbi-1;
+
+                       for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
+                               m3 = &m2->mc_xcursor->mx_cursor;
+                               if (m3->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top]) {
+                                       m3->mc_pg[mc->mc_top] = mp;
+                               }
+                       }
+               } else {
+                       MDB_cursor *m2;
+
+                       for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
+                               if (m2->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top]) {
+                                       m2->mc_pg[mc->mc_top] = mp;
+                               }
+                       }
+               }
                mc->mc_pg[mc->mc_top] = mp;
                /** If this page has a parent, update the parent to point to
                 * this new page.
@@ -1208,6 +1236,92 @@ mdb_env_sync(MDB_env *env, int force)
        return rc;
 }
 
+/** Make shadow copies of all of parent txn's cursors */
+static int
+mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
+{
+       MDB_cursor *mc, *m2;
+       unsigned int i, j, size;
+
+       for (i=0;i<src->mt_numdbs; i++) {
+               if (src->mt_cursors[i]) {
+                       size = sizeof(MDB_cursor);
+                       if (src->mt_cursors[i]->mc_xcursor)
+                               size += sizeof(MDB_xcursor);
+                       for (m2 = src->mt_cursors[i]; m2; m2=m2->mc_next) {
+                               mc = malloc(size);
+                               if (!mc)
+                                       return ENOMEM;
+                               mc->mc_orig = m2;
+                               mc->mc_txn = dst;
+                               mc->mc_dbi = i;
+                               mc->mc_db = &dst->mt_dbs[i];
+                               mc->mc_dbx = m2->mc_dbx;
+                               mc->mc_dbflag = &dst->mt_dbflags[i];
+                               mc->mc_snum = m2->mc_snum;
+                               mc->mc_top = m2->mc_top;
+                               mc->mc_flags = m2->mc_flags | C_SHADOW;
+                               for (j=0; j<mc->mc_snum; j++) {
+                                       mc->mc_pg[j] = m2->mc_pg[j];
+                                       mc->mc_ki[j] = m2->mc_ki[j];
+                               }
+                               if (m2->mc_xcursor) {
+                                       MDB_xcursor *mx, *mx2;
+                                       mx = (MDB_xcursor *)(mc+1);
+                                       mc->mc_xcursor = mx;
+                                       mx2 = m2->mc_xcursor;
+                                       mx->mx_db = mx2->mx_db;
+                                       mx->mx_dbx = mx2->mx_dbx;
+                                       mx->mx_dbflag = mx2->mx_dbflag;
+                                       mx->mx_cursor.mc_txn = dst;
+                                       mx->mx_cursor.mc_dbi = mx2->mx_cursor.mc_dbi;
+                                       mx->mx_cursor.mc_db = &mx->mx_db;
+                                       mx->mx_cursor.mc_dbx = &mx->mx_dbx;
+                                       mx->mx_cursor.mc_dbflag = &mx->mx_dbflag;
+                                       mx->mx_cursor.mc_snum = mx2->mx_cursor.mc_snum;
+                                       mx->mx_cursor.mc_top = mx2->mx_cursor.mc_top;
+                                       mx->mx_cursor.mc_flags = mx2->mx_cursor.mc_flags | C_SHADOW;
+                                       for (j=0; j<mx2->mx_cursor.mc_snum; j++) {
+                                               mx->mx_cursor.mc_pg[j] = mx2->mx_cursor.mc_pg[j];
+                                               mx->mx_cursor.mc_ki[j] = mx2->mx_cursor.mc_ki[j];
+                                       }
+                               } else {
+                                       mc->mc_xcursor = NULL;
+                               }
+                               mc->mc_next = dst->mt_cursors[i];
+                               dst->mt_cursors[i] = mc;
+                       }
+               }
+       }
+       return MDB_SUCCESS;
+}
+
+/** Merge shadow cursors back into parent's */
+static void
+mdb_cursor_merge(MDB_txn *txn)
+{
+       MDB_dbi i;
+       for (i=0; i<txn->mt_numdbs; i++) {
+               if (txn->mt_cursors[i]) {
+                       MDB_cursor *mc;
+                       while ((mc = txn->mt_cursors[i])) {
+                               txn->mt_cursors[i] = mc->mc_next;
+                               if (mc->mc_flags & C_SHADOW) {
+                                       MDB_cursor *m2 = mc->mc_orig;
+                                       unsigned int j;
+                                       m2->mc_snum = mc->mc_snum;
+                                       m2->mc_top = mc->mc_top;
+                                       for (j=0; j<mc->mc_snum; j++) {
+                                               m2->mc_pg[j] = mc->mc_pg[j];
+                                               m2->mc_ki[j] = mc->mc_ki[j];
+                                       }
+                               }
+                               free(mc);
+                       }
+               }
+       }
+}
+
 static void
 mdb_txn_reset0(MDB_txn *txn);
 
@@ -1312,7 +1426,7 @@ int
 mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
 {
        MDB_txn *txn;
-       int rc;
+       int rc, size;
 
        if (env->me_flags & MDB_FATAL_ERROR) {
                DPUTS("environment had fatal error, must shutdown!");
@@ -1324,16 +1438,22 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
                        return EINVAL;
                }
        }
-       if ((txn = calloc(1, sizeof(MDB_txn) +
-               env->me_maxdbs * (sizeof(MDB_db)+1))) == NULL) {
+       size = sizeof(MDB_txn) + env->me_maxdbs * (sizeof(MDB_db)+1);
+       if (!(flags & MDB_RDONLY))
+               size += env->me_maxdbs * sizeof(MDB_cursor *);
+
+       if ((txn = calloc(1, size)) == NULL) {
                DPRINTF("calloc: %s", strerror(ErrCode()));
                return ENOMEM;
        }
        txn->mt_dbs = (MDB_db *)(txn+1);
        if (flags & MDB_RDONLY) {
                txn->mt_flags |= MDB_TXN_RDONLY;
+               txn->mt_dbflags = (unsigned char *)(txn->mt_dbs + env->me_maxdbs);
+       } else {
+               txn->mt_cursors = (MDB_cursor **)(txn->mt_dbs + env->me_maxdbs);
+               txn->mt_dbflags = (unsigned char *)(txn->mt_cursors + env->me_maxdbs);
        }
-       txn->mt_dbflags = (unsigned char *)(txn->mt_dbs + env->me_maxdbs);
        txn->mt_env = env;
 
        if (parent) {
@@ -1359,6 +1479,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
                txn->mt_dbxs = parent->mt_dbxs;
                memcpy(txn->mt_dbs, parent->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
                memcpy(txn->mt_dbflags, parent->mt_dbflags, txn->mt_numdbs);
+               mdb_cursor_shadow(parent, txn);
                rc = 0;
        } else {
                rc = mdb_txn_renew0(txn);
@@ -1390,6 +1511,17 @@ mdb_txn_reset0(MDB_txn *txn)
                MDB_page *dp;
                unsigned int i;
 
+               /* close(free) all cursors */
+               for (i=0; i<txn->mt_numdbs; i++) {
+                       if (txn->mt_cursors[i]) {
+                               MDB_cursor *mc;
+                               while ((mc = txn->mt_cursors[i])) {
+                                       txn->mt_cursors[i] = mc->mc_next;
+                                       free(mc);
+                               }
+                       }
+               }
+
                /* return all dirty pages to dpage list */
                for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
                        dp = txn->mt_u.dirty_list[i].mptr;
@@ -1497,7 +1629,6 @@ mdb_txn_commit(MDB_txn *txn)
                return MDB_SUCCESS;
        }
 
-
        if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
                DPUTS("error flag is set, can't commit");
                if (txn->mt_parent)
@@ -1506,6 +1637,9 @@ mdb_txn_commit(MDB_txn *txn)
                return EINVAL;
        }
 
+       /* Merge (and close) our cursors with parent's */
+       mdb_cursor_merge(txn);
+
        if (txn->mt_parent) {
                MDB_db *ip, *jp;
                MDB_dbi i;
@@ -1532,21 +1666,11 @@ mdb_txn_commit(MDB_txn *txn)
                src = txn->mt_u.dirty_list;
                x = mdb_mid2l_search(dst, src[1].mid);
                for (y=1; y<=src[0].mid; y++) {
-                       MDB_page *mp;
                        while (x <= dst[0].mid && dst[x].mid != src[y].mid) x++;
                        if (x > dst[0].mid)
                                break;
-                       mp = dst[x].mptr;
-                       dp = src[y].mptr;
-                       mp->mp_lower = dp->mp_lower;
-                       mp->mp_upper = dp->mp_upper;
-                       if (IS_LEAF2(mp)) {
-                               memcpy(METADATA(mp), METADATA(dp), NUMKEYS(mp) * mp->mp_pad);
-                       } else {
-                               memcpy((char *)mp+mp->mp_upper, (char *)dp+dp->mp_upper,
-                                       txn->mt_env->me_psize - mp->mp_upper);
-                       }
-                       free(src[y].mptr);
+                       free(dst[x].mptr);
+                       dst[x].mptr = src[y].mptr;
                }
                x = dst[0].mid;
                for (; y<=src[0].mid; y++) {
@@ -2838,6 +2962,20 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp)
        return node;
 }
 
+#if 0
+static void
+mdb_cursor_adjust(MDB_cursor *mc, func)
+{
+       MDB_cursor *m2;
+
+       for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
+               if (m2->mc_pg[m2->mc_top] == mc->mc_pg[mc->mc_top]) {
+                       func(mc, m2);
+               }
+       }
+}
+#endif
+
 /** Pop a page off the top of the cursor's stack. */
 static void
 mdb_cursor_pop(MDB_cursor *mc)
@@ -3790,12 +3928,13 @@ top:
                                if (flags == MDB_CURRENT)
                                        goto current;
 
-                               /* create a fake page for the dup items */
                                dkey.mv_size = NODEDSZ(leaf);
                                dkey.mv_data = NODEDATA(leaf);
                                /* data matches, ignore it */
                                if (!mc->mc_dbx->md_dcmp(data, &dkey))
                                        return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
+
+                               /* create a fake page for the dup items */
                                memcpy(dbuf, dkey.mv_data, dkey.mv_size);
                                dkey.mv_data = dbuf;
                                fp = (MDB_page *)pbuf;
@@ -3913,6 +4052,26 @@ new_sub:
        } else {
                /* There is room already in this leaf page. */
                rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, 0);
+               if (rc == 0 && !do_sub) {
+                       /* Adjust other cursors pointing to mp */
+                       MDB_cursor *m2, *m3;
+                       MDB_dbi dbi = mc->mc_dbi;
+                       unsigned i = mc->mc_top;
+                       MDB_page *mp = mc->mc_pg[i];
+
+                       if (mc->mc_flags & C_SUB)
+                               dbi--;
+
+                       for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
+                               if (mc->mc_flags & C_SUB)
+                                       m3 = &m2->mc_xcursor->mx_cursor;
+                               else
+                                       m3 = m2;
+                               if (m3->mc_pg[i] == mp && m3->mc_ki[i] >= mc->mc_ki[i]) {
+                                       m3->mc_ki[i]++;
+                               }
+                       }
+               }
        }
 
        if (rc != MDB_SUCCESS)
@@ -3946,6 +4105,18 @@ put_sub:
                                rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
                                if (rc)
                                        return rc;
+                               {
+                                       /* Adjust other cursors pointing to mp */
+                                       MDB_cursor *m2;
+                                       unsigned i = mc->mc_top;
+                                       MDB_page *mp = mc->mc_pg[i];
+
+                                       for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
+                                               if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) {
+                                                       mdb_xcursor_init1(m2, leaf);
+                                               }
+                                       }
+                               }
                        }
                        rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
                        if (flags & F_SUBDATA) {
@@ -4373,7 +4544,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
                MDB_db *db = NODEDATA(node);
                mx->mx_db = *db;
                mx->mx_cursor.mc_snum = 0;
-               mx->mx_cursor.mc_flags = 0;
+               mx->mx_cursor.mc_flags = C_SUB;
        } else {
                MDB_page *fp = NODEDATA(node);
                mx->mx_db.md_pad = mc->mc_pg[mc->mc_top]->mp_pad;
@@ -4385,7 +4556,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
                mx->mx_db.md_entries = NUMKEYS(fp);
                mx->mx_db.md_root = fp->mp_pgno;
                mx->mx_cursor.mc_snum = 1;
-               mx->mx_cursor.mc_flags = C_INITIALIZED;
+               mx->mx_cursor.mc_flags = C_INITIALIZED|C_SUB;
                mx->mx_cursor.mc_top = 0;
                mx->mx_cursor.mc_pg[0] = fp;
                mx->mx_cursor.mc_ki[0] = 0;
@@ -4410,6 +4581,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
 static void
 mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
 {
+       mc->mc_orig = NULL;
        mc->mc_dbi = dbi;
        mc->mc_txn = txn;
        mc->mc_db = &txn->mt_dbs[dbi];
@@ -4444,6 +4616,10 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
                        mx = (MDB_xcursor *)(mc + 1);
                }
                mdb_cursor_init(mc, txn, dbi, mx);
+               if (txn->mt_cursors) {
+                       mc->mc_next = txn->mt_cursors[dbi];
+                       txn->mt_cursors[dbi] = mc;
+               }
        } else {
                return ENOMEM;
        }
@@ -4481,6 +4657,13 @@ void
 mdb_cursor_close(MDB_cursor *mc)
 {
        if (mc != NULL) {
+               /* remove from txn, if tracked */
+               if (mc->mc_txn->mt_cursors) {
+                       MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];
+                       while (*prev && *prev != mc) prev = &(*prev)->mc_next;
+                       if (*prev == mc)
+                               *prev = mc->mc_next;
+               }
                free(mc);
        }
 }
@@ -4594,6 +4777,28 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
         */
        mdb_node_del(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
 
+       {
+               /* Adjust other cursors pointing to mp */
+               MDB_cursor *m2, *m3;
+               MDB_dbi dbi = csrc->mc_dbi;
+               MDB_page *mp = csrc->mc_pg[csrc->mc_top];
+
+               if (csrc->mc_flags & C_SUB)
+                       dbi--;
+
+               for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
+                       if (csrc->mc_flags & C_SUB)
+                               m3 = &m2->mc_xcursor->mx_cursor;
+                       else
+                               m3 = m2;
+                       if (m3->mc_pg[csrc->mc_top] == mp && m3->mc_ki[csrc->mc_top] ==
+                               csrc->mc_ki[csrc->mc_top]) {
+                               m3->mc_pg[csrc->mc_top] = cdst->mc_pg[cdst->mc_top];
+                               m3->mc_ki[csrc->mc_top] = cdst->mc_ki[cdst->mc_top];
+                       }
+               }
+       }
+
        /* Update the parent separators.
         */
        if (csrc->mc_ki[csrc->mc_top] == 0) {
@@ -4657,6 +4862,7 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
        indx_t                   i, j;
        MDB_node                *srcnode;
        MDB_val          key, data;
+       unsigned        nkeys;
 
        DPRINTF("merging page %zu into %zu", csrc->mc_pg[csrc->mc_top]->mp_pgno,
                cdst->mc_pg[cdst->mc_top]->mp_pgno);
@@ -4670,7 +4876,7 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
 
        /* Move all nodes from src to dst.
         */
-       j = NUMKEYS(cdst->mc_pg[cdst->mc_top]);
+       j = nkeys = NUMKEYS(cdst->mc_pg[cdst->mc_top]);
        if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
                key.mv_size = csrc->mc_db->md_pad;
                key.mv_data = METADATA(csrc->mc_pg[csrc->mc_top]);
@@ -4711,6 +4917,26 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
                csrc->mc_db->md_leaf_pages--;
        else
                csrc->mc_db->md_branch_pages--;
+       {
+               /* Adjust other cursors pointing to mp */
+               MDB_cursor *m2, *m3;
+               MDB_dbi dbi = csrc->mc_dbi;
+               MDB_page *mp = cdst->mc_pg[cdst->mc_top];
+
+               if (csrc->mc_flags & C_SUB)
+                       dbi--;
+
+               for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
+                       if (csrc->mc_flags & C_SUB)
+                               m3 = &m2->mc_xcursor->mx_cursor;
+                       else
+                               m3 = m2;
+                       if (m3->mc_pg[csrc->mc_top] == csrc->mc_pg[csrc->mc_top]) {
+                               m3->mc_pg[csrc->mc_top] = mp;
+                               m3->mc_ki[csrc->mc_top] += nkeys;
+                       }
+               }
+       }
        mdb_cursor_pop(csrc);
 
        return mdb_rebalance(csrc);
@@ -4763,22 +4989,61 @@ mdb_rebalance(MDB_cursor *mc)
        }
 
        if (mc->mc_snum < 2) {
-               if (NUMKEYS(mc->mc_pg[mc->mc_top]) == 0) {
+               MDB_page *mp = mc->mc_pg[0];
+               if (NUMKEYS(mp) == 0) {
                        DPUTS("tree is completely empty");
                        mc->mc_db->md_root = P_INVALID;
                        mc->mc_db->md_depth = 0;
                        mc->mc_db->md_leaf_pages = 0;
-                       mdb_midl_append(&mc->mc_txn->mt_free_pgs, mc->mc_pg[mc->mc_top]->mp_pgno);
+                       mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno);
                        mc->mc_snum = 0;
-               } else if (IS_BRANCH(mc->mc_pg[mc->mc_top]) && NUMKEYS(mc->mc_pg[mc->mc_top]) == 1) {
+                       mc->mc_top = 0;
+                       {
+                               /* Adjust other cursors pointing to mp */
+                               MDB_cursor *m2, *m3;
+                               MDB_dbi dbi = mc->mc_dbi;
+
+                               if (mc->mc_flags & C_SUB)
+                                       dbi--;
+
+                               for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
+                                       if (mc->mc_flags & C_SUB)
+                                               m3 = &m2->mc_xcursor->mx_cursor;
+                                       else
+                                               m3 = m2;
+                                       if (m3->mc_pg[0] == mp) {
+                                               m3->mc_snum = 0;
+                                               m3->mc_top = 0;
+                                       }
+                               }
+                       }
+               } else if (IS_BRANCH(mp) && NUMKEYS(mp) == 1) {
                        DPUTS("collapsing root page!");
-                       mdb_midl_append(&mc->mc_txn->mt_free_pgs, mc->mc_pg[mc->mc_top]->mp_pgno);
-                       mc->mc_db->md_root = NODEPGNO(NODEPTR(mc->mc_pg[mc->mc_top], 0));
+                       mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno);
+                       mc->mc_db->md_root = NODEPGNO(NODEPTR(mp, 0));
                        if ((rc = mdb_page_get(mc->mc_txn, mc->mc_db->md_root,
-                               &mc->mc_pg[mc->mc_top])))
+                               &mc->mc_pg[0])))
                                return rc;
                        mc->mc_db->md_depth--;
                        mc->mc_db->md_branch_pages--;
+                       {
+                               /* Adjust other cursors pointing to mp */
+                               MDB_cursor *m2, *m3;
+                               MDB_dbi dbi = mc->mc_dbi;
+
+                               if (mc->mc_flags & C_SUB)
+                                       dbi--;
+
+                               for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
+                                       if (mc->mc_flags & C_SUB)
+                                               m3 = &m2->mc_xcursor->mx_cursor;
+                                       else
+                                               m3 = m2;
+                                       if (m3->mc_pg[0] == mp) {
+                                               m3->mc_pg[0] = mc->mc_pg[0];
+                                       }
+                               }
+                       }
                } else
                        DPUTS("root page doesn't need rebalancing");
                return MDB_SUCCESS;
@@ -4925,7 +5190,7 @@ static int
 mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
 {
        uint8_t          flags;
-       int              rc = MDB_SUCCESS, ins_new = 0;
+       int              rc = MDB_SUCCESS, ins_new = 0, new_root = 0;
        indx_t           newindx;
        pgno_t           pgno = 0;
        unsigned int     i, j, split_indx, nkeys, pmax;
@@ -4955,6 +5220,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                mc->mc_db->md_root = pp->mp_pgno;
                DPRINTF("root split! new root = %zu", pp->mp_pgno);
                mc->mc_db->md_depth++;
+               new_root = 1;
 
                /* Add left (implicit) pointer. */
                if ((rc = mdb_node_add(mc, 0, NULL, NULL, mp->mp_pgno, 0)) != MDB_SUCCESS) {
@@ -5102,12 +5368,12 @@ newsep:
                rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
                mn.mc_top++;
        }
-       if (IS_LEAF2(rp)) {
-               return rc;
-       }
        if (rc != MDB_SUCCESS) {
                return rc;
        }
+       if (IS_LEAF2(rp)) {
+               goto done;
+       }
 
        /* Move half of the keys to the right sibling. */
 
@@ -5183,6 +5449,39 @@ newsep:
        /* return tmp page to freelist */
        copy->mp_next = mc->mc_txn->mt_env->me_dpages;
        mc->mc_txn->mt_env->me_dpages = copy;
+done:
+       {
+               /* Adjust other cursors pointing to mp */
+               MDB_cursor *m2, *m3;
+               MDB_dbi dbi = mc->mc_dbi;
+
+               if (mc->mc_flags & C_SUB)
+                       dbi--;
+
+               for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
+                       if (mc->mc_flags & C_SUB)
+                               m3 = &m2->mc_xcursor->mx_cursor;
+                       else
+                               m3 = m2;
+                       if (new_root) {
+                               /* root split */
+                               for (i=m3->mc_top; i>0; i--) {
+                                       m3->mc_ki[i+1] = m3->mc_ki[i];
+                                       m3->mc_pg[i+1] = m3->mc_pg[i];
+                               }
+                               m3->mc_ki[0] = mc->mc_ki[0];
+                               m3->mc_pg[0] = mc->mc_pg[0];
+                               m3->mc_snum++;
+                               m3->mc_top++;
+                       }
+                       if (m3->mc_pg[mc->mc_top] == mp) {
+                               if (m3->mc_ki[m3->mc_top] >= split_indx) {
+                                       m3->mc_pg[m3->mc_top] = rp;
+                                       m3->mc_ki[m3->mc_top] -= split_indx;
+                               }
+                       }
+               }
+       }
        return rc;
 }
 
index 1672d81..14105b2 100644 (file)
@@ -843,9 +843,6 @@ void mdb_cursor_close(MDB_cursor *cursor);
         * case of the #MDB_SET option, in which the \b key object is unchanged), and
         * the address and length of the data are returned in the object to which \b data
         * refers.
-        * @bug Cursors are not coordinated with write operations. If a cursor in a
-        * write transaction is performing a sequential scan while records are being
-        * inserted or deleted in the same transaction, the cursor will be corrupted.
         * @param[in] cursor A cursor handle returned by #mdb_cursor_open()
         * @param[in,out] key The key for a retrieved item
         * @param[in,out] data The data of a retrieved item