Remove mdb data races. Use (txnid_t)-1 as "no ID".
authorHallvard Furuseth <hallvard@openldap.org>
Mon, 17 Sep 2012 13:42:14 +0000 (15:42 +0200)
committerHallvard Furuseth <hallvard@openldap.org>
Mon, 17 Sep 2012 13:42:14 +0000 (15:42 +0200)
Avoid race between numreaders++ and reading numreaders at cleanup. Make
the un-mutexed reset of reader table entry, atomic: Reset mr_pid only.

Instead check mr_pid != 0 in mdb_page_alloc()'s scan for readers.
(txnid_t)-1 as "no ID"-mark avoids a check for mr_txnid != 0.
The scan can stop when seeing an old reader.

libraries/libmdb/mdb.c

index 37403fbcb05a9ea9c58bf9236da178c3207f7fe4..6b759d702aa8c11e41000e65185127424df93d2d 100644 (file)
@@ -433,7 +433,7 @@ typedef uint16_t     indx_t;
         *      lock file.
         */
 typedef struct MDB_rxbody {
-       /**     The current Transaction ID when this transaction began.
+       /**     Current Transaction ID when this transaction began, or (txnid_t)-1.
         *      Multiple readers that start at the same time will probably have the
         *      same ID here. Again, it's not important to exclude them from
         *      anything; all we need to know is which version of the DB they
@@ -904,6 +904,7 @@ struct MDB_env {
        uint32_t        me_flags;               /**< @ref mdb_env */
        unsigned int    me_psize;       /**< size of a page, from #GET_PAGESIZE */
        unsigned int    me_maxreaders;  /**< size of the reader table */
+       unsigned int    me_numreaders;  /**< max numreaders set by this env */
        MDB_dbi         me_numdbs;              /**< number of DBs opened */
        MDB_dbi         me_maxdbs;              /**< size of the DB table */
        pid_t           me_pid;         /**< process ID of this env */
@@ -1233,10 +1234,12 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
                if (!txn->mt_env->me_pghead &&
                        txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
                        /* See if there's anything in the free DB */
+                       int j;
+                       MDB_reader *r;
                        MDB_cursor m2;
                        MDB_node *leaf;
                        MDB_val data;
-                       txnid_t *kptr, oldest, last;
+                       txnid_t *kptr, last;
 
                        mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
                        if (!txn->mt_env->me_pgfirst) {
@@ -1259,17 +1262,15 @@ again:
                                last = *(txnid_t *)key.mv_data;
                        }
 
-                       {
-                               unsigned int i;
-                               oldest = txn->mt_txnid - 1;
-                               for (i=0; i<txn->mt_env->me_txns->mti_numreaders; i++) {
-                                       txnid_t mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
-                                       if (mr && mr < oldest)
-                                               oldest = mr;
-                               }
+                       /* Unusable if referred by a meta page or reader... */
+                       j = 1;
+                       if (last < txn->mt_txnid-1) {
+                               j = txn->mt_env->me_txns->mti_numreaders;
+                               r = txn->mt_env->me_txns->mti_readers + j;
+                               for (j = -j; j && (last<r[j].mr_txnid || !r[j].mr_pid); j++) ;
                        }
 
-                       if (oldest > last) {
+                       if (!j) {
                                /* It's usable, grab it.
                                 */
                                MDB_oldpages *mop;
@@ -1641,6 +1642,8 @@ mdb_txn_renew0(MDB_txn *txn)
                        env->me_txns->mti_readers[i].mr_tid = tid;
                        if (i >= env->me_txns->mti_numreaders)
                                env->me_txns->mti_numreaders = i+1;
+                       /* Save numreaders for un-mutexed mdb_env_close() */
+                       env->me_numreaders = env->me_txns->mti_numreaders;
                        UNLOCK_MUTEX_R(env);
                        r = &env->me_txns->mti_readers[i];
                        pthread_setspecific(env->me_txkey, r);
@@ -1788,7 +1791,7 @@ mdb_txn_reset0(MDB_txn *txn)
        MDB_env *env = txn->mt_env;
 
        if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
-               txn->mt_u.reader->mr_txnid = 0;
+               txn->mt_u.reader->mr_txnid = (txnid_t)-1;
        } else {
                MDB_oldpages *mop;
                MDB_page *dp;
@@ -2668,9 +2671,7 @@ mdb_env_reader_dest(void *ptr)
 {
        MDB_reader *reader = ptr;
 
-       reader->mr_txnid = 0;
        reader->mr_pid = 0;
-       reader->mr_tid = 0;
 }
 
 #ifdef _WIN32
@@ -3259,7 +3260,7 @@ mdb_env_close(MDB_env *env)
        if (env->me_txns) {
                pid_t pid = env->me_pid;
                unsigned int i;
-               for (i=0; i<env->me_txns->mti_numreaders; i++)
+               for (i=0; i<env->me_numreaders; i++)
                        if (env->me_txns->mti_readers[i].mr_pid == pid)
                                env->me_txns->mti_readers[i].mr_pid = 0;
 #ifdef _WIN32