More tweaks to copyfd2
authorHoward Chu <hyc@symas.com>
Thu, 3 Jul 2014 21:26:14 +0000 (14:26 -0700)
committerHoward Chu <hyc@symas.com>
Thu, 3 Jul 2014 21:26:14 +0000 (14:26 -0700)
Make sure the writer thread starts and stops when we expect it to.

libraries/liblmdb/mdb.c

index 6997ae9583056203c3f79502126260ab5cf7cb18..609eb9230a0884c09abc64790402dfb5accd0944 100644 (file)
 #define THREAD_RET     DWORD
 #define pthread_t      HANDLE
 #define pthread_mutex_t        HANDLE
+#define pthread_cond_t HANDLE
 #define pthread_key_t  DWORD
 #define pthread_self() GetCurrentThreadId()
 #define pthread_key_create(x,y)        \
 #define pthread_setspecific(x,y)       (TlsSetValue(x,y) ? 0 : ErrCode())
 #define pthread_mutex_unlock(x)        ReleaseMutex(x)
 #define pthread_mutex_lock(x)  WaitForSingleObject(x, INFINITE)
+#define pthread_cond_signal(x) SetEvent(*x)
+#define pthread_cond_wait(cond,mutex)  SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE)
 #define THREAD_CREATE(thr,start,arg)   thr=CreateThread(NULL,0,start,arg,0,NULL)
 #define THREAD_FINISH(thr)     WaitForSingleObject(thr, INFINITE)
 #define LOCK_MUTEX_R(env)      pthread_mutex_lock((env)->me_rmutex)
@@ -8034,7 +8037,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
 #endif
 
 typedef struct mdb_copy {
-       pthread_mutex_t mc_mutex[2];
+       pthread_mutex_t mc_mutex;
+       pthread_cond_t mc_cond;
        char *mc_wbuf[2];
        char *mc_over[2];
        MDB_env *mc_env;
@@ -8044,6 +8048,7 @@ typedef struct mdb_copy {
        pgno_t mc_next_pgno;
        HANDLE mc_fd;
        int mc_status;
+       volatile int mc_new;
        int mc_toggle;
 } mdb_copy;
 
@@ -8061,14 +8066,17 @@ mdb_env_copythr(void *arg)
 #define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
 #endif
 
-       pthread_mutex_lock(&my->mc_mutex[toggle^1]);
+       pthread_mutex_lock(&my->mc_mutex);
+       my->mc_new = 0;
+       pthread_cond_signal(&my->mc_cond);
        for(;;) {
-               pthread_mutex_lock(&my->mc_mutex[toggle]);
-               pthread_mutex_unlock(&my->mc_mutex[toggle^1]);
-               if (!my->mc_wlen[toggle]) {
-                       pthread_mutex_unlock(&my->mc_mutex[toggle]);
+               while (!my->mc_new)
+                       pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
+               if (my->mc_new < 0) {
+                       my->mc_new = 0;
                        break;
                }
+               my->mc_new = 0;
                wsize = my->mc_wlen[toggle];
                ptr = my->mc_wbuf[toggle];
 again:
@@ -8089,7 +8097,6 @@ again:
                }
                if (rc) {
                        my->mc_status = rc;
-                       pthread_mutex_unlock(&my->mc_mutex[toggle]);
                        break;
                }
                /* If there's an overflow page tail, write it too */
@@ -8101,23 +8108,29 @@ again:
                }
                my->mc_wlen[toggle] = 0;
                toggle ^= 1;
+               pthread_cond_signal(&my->mc_cond);
        }
+       pthread_cond_signal(&my->mc_cond);
+       pthread_mutex_unlock(&my->mc_mutex);
        return (THREAD_RET)0;
 #undef DO_WRITE
 }
 
 static int
-mdb_env_cthr_toggle(mdb_copy *my)
+mdb_env_cthr_toggle(mdb_copy *my, int st)
 {
        int toggle = my->mc_toggle ^ 1;
-
-       pthread_mutex_unlock(&my->mc_mutex[my->mc_toggle]);
-       pthread_mutex_lock(&my->mc_mutex[toggle]);
+       pthread_mutex_lock(&my->mc_mutex);
        if (my->mc_status) {
-               pthread_mutex_unlock(&my->mc_mutex[toggle]);
+               pthread_mutex_unlock(&my->mc_mutex);
                return my->mc_status;
        }
+       while (my->mc_new == 1)
+               pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
+       my->mc_new = st;
        my->mc_toggle = toggle;
+       pthread_cond_signal(&my->mc_cond);
+       pthread_mutex_unlock(&my->mc_mutex);
        return 0;
 }
 
@@ -8188,10 +8201,10 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
                                                if (rc)
                                                        goto done;
                                                if (my->mc_wlen[toggle] >= MDB_WBUF) {
-                                                       rc = mdb_env_cthr_toggle(my);
+                                                       rc = mdb_env_cthr_toggle(my, 1);
                                                        if (rc)
                                                                goto done;
-                                                       toggle ^= 1;
+                                                       toggle = my->mc_toggle;
                                                }
                                                mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
                                                memcpy(mo, omp, my->mc_env->me_psize);
@@ -8201,10 +8214,10 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
                                                if (omp->mp_pages > 1) {
                                                        my->mc_olen[toggle] = my->mc_env->me_psize * (omp->mp_pages - 1);
                                                        my->mc_over[toggle] = (char *)omp + my->mc_env->me_psize;
-                                                       rc = mdb_env_cthr_toggle(my);
+                                                       rc = mdb_env_cthr_toggle(my, 1);
                                                        if (rc)
                                                                goto done;
-                                                       toggle ^= 1;
+                                                       toggle = my->mc_toggle;
                                                }
                                                memcpy(NODEDATA(ni), &mo->mp_pgno, sizeof(pgno_t));
                                        } else if (ni->mn_flags & F_SUBDATA) {
@@ -8250,10 +8263,10 @@ again:
                        }
                }
                if (my->mc_wlen[toggle] >= MDB_WBUF) {
-                       rc = mdb_env_cthr_toggle(my);
+                       rc = mdb_env_cthr_toggle(my, 1);
                        if (rc)
                                goto done;
-                       toggle ^= 1;
+                       toggle = my->mc_toggle;
                }
                mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
                mdb_page_copy(mo, mp, my->mc_env->me_psize);
@@ -8286,14 +8299,14 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd)
        int rc;
 
 #ifdef _WIN32
-       my.mc_mutex[0] = CreateMutex(NULL, FALSE, NULL);
-       my.mc_mutex[1] = CreateMutex(NULL, FALSE, NULL);
+       my.mc_mutex = CreateMutex(NULL, FALSE, NULL);
+       my.mc_cond = CreateEvent(NULL, FALSE, FALSE, NULL);
        my.mc_wbuf[0] = _aligned_malloc(MDB_WBUF*2, env->me_psize);
        if (my.mc_wbuf[0] == NULL)
                return errno;
 #else
-       pthread_mutex_init(&my.mc_mutex[0], NULL);
-       pthread_mutex_init(&my.mc_mutex[1], NULL);
+       pthread_mutex_init(&my.mc_mutex, NULL);
+       pthread_cond_init(&my.mc_cond, NULL);
        rc = posix_memalign((void **)&my.mc_wbuf[0], env->me_psize, MDB_WBUF*2);
        if (rc)
                return rc;
@@ -8305,11 +8318,10 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd)
        my.mc_olen[1] = 0;
        my.mc_next_pgno = 2;
        my.mc_status = 0;
+       my.mc_new = 1;
        my.mc_toggle = 0;
        my.mc_env = env;
        my.mc_fd = fd;
-       pthread_mutex_lock(&my.mc_mutex[0]);
-       THREAD_CREATE(thr, mdb_env_copythr, &my);
 
        /* Do the lock/unlock of the reader mutex before starting the
         * write txn.  Otherwise other read txns could block writers.
@@ -8332,6 +8344,7 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd)
                }
        }
 
+       THREAD_CREATE(thr, mdb_env_copythr, &my);
        mp = (MDB_page *)my.mc_wbuf[0];
        memset(mp, 0, 2*env->me_psize);
        mp->mp_pgno = 0;
@@ -8368,21 +8381,28 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd)
        }
        my.mc_wlen[0] = env->me_psize * 2;
        my.mc_txn = txn;
+       pthread_mutex_lock(&my.mc_mutex);
+       while(my.mc_new)
+               pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
+       pthread_mutex_unlock(&my.mc_mutex);
        rc = mdb_env_cwalk(&my, &txn->mt_dbs[1].md_root, 0);
        if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle])
-               rc = mdb_env_cthr_toggle(&my);
-       my.mc_wlen[my.mc_toggle] = 0;
-       pthread_mutex_unlock(&my.mc_mutex[my.mc_toggle]);
+               rc = mdb_env_cthr_toggle(&my, 1);
+       mdb_env_cthr_toggle(&my, -1);
+       pthread_mutex_lock(&my.mc_mutex);
+       while(my.mc_new)
+               pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
+       pthread_mutex_unlock(&my.mc_mutex);
        THREAD_FINISH(thr);
 leave:
        mdb_txn_abort(txn);
 #ifdef _WIN32
-       CloseHandle(my.mc_mutex[1]);
-       CloseHandle(my.mc_mutex[0]);
+       CloseHandle(my.mc_cond);
+       CloseHandle(my.mc_mutex);
        _aligned_free(my.mc_wbuf[0]);
 #else
-       pthread_mutex_destroy(&my.mc_mutex[1]);
-       pthread_mutex_destroy(&my.mc_mutex[0]);
+       pthread_cond_destroy(&my.mc_cond);
+       pthread_mutex_destroy(&my.mc_mutex);
        free(my.mc_wbuf[0]);
 #endif
        return rc;