typedef UV_PLATFORM_SEM_T uv_sem_t;
typedef pthread_cond_t uv_cond_t;
+
+#if defined(__APPLE__) && defined(__MACH__)
+
+typedef struct {
+ unsigned int n;
+ unsigned int count;
+ uv_mutex_t mutex;
+ uv_sem_t turnstile1;
+ uv_sem_t turnstile2;
+} uv_barrier_t;
+
+#else /* defined(__APPLE__) && defined(__MACH__) */
+
+typedef pthread_barrier_t uv_barrier_t;
+
+#endif /* defined(__APPLE__) && defined(__MACH__) */
+
/* Platform-specific definitions for uv_spawn support. */
typedef gid_t uv_gid_t;
typedef uid_t uv_uid_t;
} fallback_;
} uv_rwlock_t;
+typedef struct {
+ unsigned int n;
+ unsigned int count;
+ uv_mutex_t mutex;
+ uv_sem_t turnstile1;
+ uv_sem_t turnstile2;
+} uv_barrier_t;
+
#define UV_ONCE_INIT { 0, NULL }
typedef struct uv_once_s {
#define UV_REQ_FIELDS \
/* public */ \
void* data; \
+ /* read-only */ \
+ uv_req_type type; \
/* private */ \
ngx_queue_t active_queue; \
UV_REQ_PRIVATE_FIELDS \
- /* read-only */ \
- uv_req_type type; \
/* Abstract base class of all requests. */
struct uv_req_s {
*/
unsigned int flags;
/*
- * Libuv can change the child process' user/group id. This happens only when
- * the appropriate bits are set in the flags fields. This is not supported on
- * windows; uv_spawn() will fail and set the error to UV_ENOTSUP.
- */
- uv_uid_t uid;
- uv_gid_t gid;
-
- /*
* The `stdio` field points to an array of uv_stdio_container_t structs that
* describe the file descriptors that will be made available to the child
* process. The convention is that stdio[0] points to stdin, fd 1 is used for
*/
int stdio_count;
uv_stdio_container_t* stdio;
+ /*
+ * Libuv can change the child process' user/group id. This happens only when
+ * the appropriate bits are set in the flags fields. This is not supported on
+ * windows; uv_spawn() will fail and set the error to UV_ENOTSUP.
+ */
+ uv_uid_t uid;
+ uv_gid_t gid;
} uv_process_options_t;
/*
UV_EXTERN int uv_cond_timedwait(uv_cond_t* cond, uv_mutex_t* mutex,
uint64_t timeout);
+UV_EXTERN int uv_barrier_init(uv_barrier_t* barrier, unsigned int count);
+UV_EXTERN void uv_barrier_destroy(uv_barrier_t* barrier);
+UV_EXTERN void uv_barrier_wait(uv_barrier_t* barrier);
+
/* Runs a function once and only once. Concurrent calls to uv_once() with the
* same guard will block all callers except one (it's unspecified which one).
* The guard should be initialized statically with the UV_ONCE_INIT macro.
}
-static ssize_t uv__fs_pwrite(uv_fs_t* req) {
-#if defined(__APPLE__)
- /* Serialize writes on OS X, concurrent pwrite() calls result in data loss.
- * We can't use a per-file descriptor lock, the descriptor may be a dup().
- */
- static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
- ssize_t r;
-
- pthread_mutex_lock(&lock);
- r = pwrite(req->file, req->buf, req->len, req->off);
- pthread_mutex_unlock(&lock);
-
- return r;
-#else
- return pwrite(req->file, req->buf, req->len, req->off);
-#endif
-}
-
static ssize_t uv__fs_read(uv_fs_t* req) {
if (req->off < 0)
return read(req->file, req->buf, req->len);
static ssize_t uv__fs_write(uv_fs_t* req) {
+ ssize_t r;
+
+ /* Serialize writes on OS X, concurrent write() and pwrite() calls result in
+ * data loss. We can't use a per-file descriptor lock, the descriptor may be
+ * a dup().
+ */
+#if defined(__APPLE__)
+ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+ pthread_mutex_lock(&lock);
+#endif
+
if (req->off < 0)
- return write(req->file, req->buf, req->len);
+ r = write(req->file, req->buf, req->len);
else
- return uv__fs_pwrite(req);
+ r = pwrite(req->file, req->buf, req->len, req->off);
+
+#if defined(__APPLE__)
+ pthread_mutex_unlock(&lock);
+#endif
+
+ return r;
}
}
#endif /* defined(__APPLE__) && defined(__MACH__) */
+
+
+#if defined(__APPLE__) && defined(__MACH__)
+
+int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) {
+ barrier->n = count;
+ barrier->count = 0;
+
+ if (uv_mutex_init(&barrier->mutex))
+ return -1;
+
+ if (uv_sem_init(&barrier->turnstile1, 0))
+ goto error2;
+
+ if (uv_sem_init(&barrier->turnstile2, 1))
+ goto error;
+
+ return 0;
+
+error:
+ uv_sem_destroy(&barrier->turnstile1);
+error2:
+ uv_mutex_destroy(&barrier->mutex);
+ return -1;
+
+}
+
+
+void uv_barrier_destroy(uv_barrier_t* barrier) {
+ uv_sem_destroy(&barrier->turnstile2);
+ uv_sem_destroy(&barrier->turnstile1);
+ uv_mutex_destroy(&barrier->mutex);
+}
+
+
+void uv_barrier_wait(uv_barrier_t* barrier) {
+ uv_mutex_lock(&barrier->mutex);
+ if (++barrier->count == barrier->n) {
+ uv_sem_wait(&barrier->turnstile2);
+ uv_sem_post(&barrier->turnstile1);
+ }
+ uv_mutex_unlock(&barrier->mutex);
+
+ uv_sem_wait(&barrier->turnstile1);
+ uv_sem_post(&barrier->turnstile1);
+
+ uv_mutex_lock(&barrier->mutex);
+ if (--barrier->count == 0) {
+ uv_sem_wait(&barrier->turnstile1);
+ uv_sem_post(&barrier->turnstile2);
+ }
+ uv_mutex_unlock(&barrier->mutex);
+
+ uv_sem_wait(&barrier->turnstile2);
+ uv_sem_post(&barrier->turnstile2);
+}
+
+#else /* !(defined(__APPLE__) && defined(__MACH__)) */
+
+int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) {
+ if (pthread_barrier_init(barrier, NULL, count))
+ return -1;
+ else
+ return 0;
+}
+
+
+void uv_barrier_destroy(uv_barrier_t* barrier) {
+ if (pthread_barrier_destroy(barrier))
+ abort();
+}
+
+
+void uv_barrier_wait(uv_barrier_t* barrier) {
+ int r = pthread_barrier_wait(barrier);
+ if (r && r != PTHREAD_BARRIER_SERIAL_THREAD)
+ abort();
+}
+
+#endif /* defined(__APPLE__) && defined(__MACH__) */
__attribute__((destructor))
static void cleanup(void) {
unsigned int i;
- int err;
if (initialized == 0)
return;
post(&exit_message);
- for (i = 0; i < ARRAY_SIZE(threads); i++) {
- err = pthread_join(threads[i], NULL);
- assert(err == 0 || err == ESRCH);
- (void) err; /* Silence compiler warning in release builds. */
- }
+ for (i = 0; i < ARRAY_SIZE(threads); i++)
+ if (pthread_join(threads[i], NULL))
+ abort();
+
+ initialized = 0;
}
else
return uv_cond_fallback_timedwait(cond, mutex, timeout);
}
+
+
+int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) {
+ barrier->n = count;
+ barrier->count = 0;
+
+ if (uv_mutex_init(&barrier->mutex))
+ return -1;
+
+ if (uv_sem_init(&barrier->turnstile1, 0))
+ goto error2;
+
+ if (uv_sem_init(&barrier->turnstile2, 1))
+ goto error;
+
+ return 0;
+
+error:
+ uv_sem_destroy(&barrier->turnstile1);
+error2:
+ uv_mutex_destroy(&barrier->mutex);
+ return -1;
+
+}
+
+
+void uv_barrier_destroy(uv_barrier_t* barrier) {
+ uv_sem_destroy(&barrier->turnstile2);
+ uv_sem_destroy(&barrier->turnstile1);
+ uv_mutex_destroy(&barrier->mutex);
+}
+
+
+void uv_barrier_wait(uv_barrier_t* barrier) {
+ uv_mutex_lock(&barrier->mutex);
+ if (++barrier->count == barrier->n) {
+ uv_sem_wait(&barrier->turnstile2);
+ uv_sem_post(&barrier->turnstile1);
+ }
+ uv_mutex_unlock(&barrier->mutex);
+
+ uv_sem_wait(&barrier->turnstile1);
+ uv_sem_post(&barrier->turnstile1);
+
+ uv_mutex_lock(&barrier->mutex);
+ if (--barrier->count == 0) {
+ uv_sem_wait(&barrier->turnstile1);
+ uv_sem_post(&barrier->turnstile2);
+ }
+ uv_mutex_unlock(&barrier->mutex);
+
+ uv_sem_wait(&barrier->turnstile2);
+ uv_sem_post(&barrier->turnstile2);
+}
#include <malloc.h>
#include <stdio.h>
#include <process.h>
-#include <crtdbg.h>
+#if !defined(__MINGW32__)
+# include <crtdbg.h>
+#endif
+
#include "task.h"
#include "runner.h"
/* Disable the "application crashed" popup. */
SetErrorMode(SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX |
SEM_NOOPENFILEERRORBOX);
+#if !defined(__MINGW32__)
_CrtSetReportMode(_CRT_ASSERT, _CRTDBG_MODE_DEBUG);
_CrtSetReportMode(_CRT_ERROR, _CRTDBG_MODE_DEBUG);
+#endif
_setmode(0, _O_BINARY);
_setmode(1, _O_BINARY);
--- /dev/null
+/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "uv.h"
+#include "task.h"
+
+#include <string.h>
+#include <errno.h>
+
+typedef struct {
+ uv_barrier_t barrier;
+ int delay;
+ volatile int posted;
+} worker_config;
+
+
+static void worker(void* arg) {
+ worker_config* c = arg;
+
+ if (c->delay)
+ uv_sleep(c->delay);
+
+ uv_barrier_wait(&c->barrier);
+}
+
+
+TEST_IMPL(barrier_1) {
+ uv_thread_t thread;
+ worker_config wc;
+
+ memset(&wc, 0, sizeof(wc));
+
+ ASSERT(0 == uv_barrier_init(&wc.barrier, 2));
+ ASSERT(0 == uv_thread_create(&thread, worker, &wc));
+
+ uv_sleep(100);
+ uv_barrier_wait(&wc.barrier);
+
+ ASSERT(0 == uv_thread_join(&thread));
+ uv_barrier_destroy(&wc.barrier);
+
+ return 0;
+}
+
+
+TEST_IMPL(barrier_2) {
+ uv_thread_t thread;
+ worker_config wc;
+
+ memset(&wc, 0, sizeof(wc));
+ wc.delay = 100;
+
+ ASSERT(0 == uv_barrier_init(&wc.barrier, 2));
+ ASSERT(0 == uv_thread_create(&thread, worker, &wc));
+
+ uv_barrier_wait(&wc.barrier);
+
+ ASSERT(0 == uv_thread_join(&thread));
+ uv_barrier_destroy(&wc.barrier);
+
+ return 0;
+}
+
+
+TEST_IMPL(barrier_3) {
+ uv_thread_t thread;
+ worker_config wc;
+
+ memset(&wc, 0, sizeof(wc));
+
+ ASSERT(0 == uv_barrier_init(&wc.barrier, 2));
+ ASSERT(0 == uv_thread_create(&thread, worker, &wc));
+
+ uv_barrier_wait(&wc.barrier);
+
+ ASSERT(0 == uv_thread_join(&thread));
+ uv_barrier_destroy(&wc.barrier);
+
+ return 0;
+}
--- /dev/null
+/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "uv.h"
+#include "task.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#define MAX_CONSUMERS 32
+#define MAX_LOOPS 1000
+
+struct buffer_s {
+ ngx_queue_t queue;
+ int data;
+};
+typedef struct buffer_s buffer_t;
+
+static ngx_queue_t queue;
+static uv_mutex_t mutex;
+static uv_cond_t empty;
+static uv_cond_t full;
+
+static volatile int finished_consumers = 0;
+
+
+static void produce(int value) {
+ buffer_t* buf;
+
+ buf = malloc(sizeof(*buf));
+ ngx_queue_init(&buf->queue);
+ buf->data = value;
+ ngx_queue_insert_tail(&queue, &buf->queue);
+}
+
+
+static int consume(void) {
+ ngx_queue_t* q;
+ buffer_t* buf;
+ int data;
+
+ ASSERT(!ngx_queue_empty(&queue));
+ q = ngx_queue_last(&queue);
+ ngx_queue_remove(q);
+
+ buf = ngx_queue_data(q, buffer_t, queue);
+ data = buf->data;
+ free(buf);
+
+ return data;
+}
+
+
+static void producer(void* arg) {
+ int i;
+
+ (void) arg;
+
+ for (i = 0; i < MAX_LOOPS * MAX_CONSUMERS; i++) {
+ uv_mutex_lock(&mutex);
+ while(!ngx_queue_empty(&queue))
+ uv_cond_wait(&empty, &mutex);
+ produce(i);
+ uv_cond_signal(&full);
+ uv_mutex_unlock(&mutex);
+ }
+
+ LOGF("finished_consumers: %d\n", finished_consumers);
+ ASSERT(finished_consumers == MAX_CONSUMERS);
+}
+
+
+static void consumer(void* arg) {
+ int i;
+ int value;
+
+ (void) arg;
+
+ for (i = 0; i < MAX_LOOPS; i++) {
+ uv_mutex_lock(&mutex);
+ while (ngx_queue_empty(&queue))
+ uv_cond_wait(&full, &mutex);
+ value = consume();
+ ASSERT(value < MAX_LOOPS * MAX_CONSUMERS);
+ uv_cond_signal(&empty);
+ uv_mutex_unlock(&mutex);
+ }
+
+ finished_consumers++;
+}
+
+
+TEST_IMPL(consumer_producer) {
+ int i;
+ uv_thread_t cthreads[MAX_CONSUMERS];
+ uv_thread_t pthread;
+
+ ngx_queue_init(&queue);
+ ASSERT(0 == uv_mutex_init(&mutex));
+ ASSERT(0 == uv_cond_init(&empty));
+ ASSERT(0 == uv_cond_init(&full));
+
+ for (i = 0; i < MAX_CONSUMERS; i++) {
+ ASSERT(0 == uv_thread_create(&cthreads[i], consumer, NULL));
+ }
+
+ ASSERT(0 == uv_thread_create(&pthread, producer, NULL));
+
+ for (i = 0; i < MAX_CONSUMERS; i++) {
+ ASSERT(0 == uv_thread_join(&cthreads[i]));
+ }
+
+ ASSERT(0 == uv_thread_join(&pthread));
+ uv_cond_destroy(&empty);
+ uv_cond_destroy(&full);
+ uv_mutex_destroy(&mutex);
+
+ return 0;
+}
TEST_DECLARE (platform_output)
TEST_DECLARE (callback_order)
TEST_DECLARE (run_once)
+TEST_DECLARE (barrier_1)
+TEST_DECLARE (barrier_2)
+TEST_DECLARE (barrier_3)
TEST_DECLARE (condvar_1)
TEST_DECLARE (condvar_2)
TEST_DECLARE (condvar_3)
TEST_ENTRY (callback_order)
#endif
TEST_ENTRY (run_once)
+ TEST_ENTRY (barrier_1)
+ TEST_ENTRY (barrier_2)
+ TEST_ENTRY (barrier_3)
TEST_ENTRY (condvar_1)
TEST_ENTRY (condvar_2)
TEST_ENTRY (condvar_3)
'test/test-mutexes.c',
'test/test-signal.c',
'test/test-thread.c',
+ 'test/test-barrier.c',
'test/test-condvar.c',
'test/test-condvar-consumer-producer.c',
'test/test-timer-again.c',