src: simplify v8 thread pool implementation
authorBen Noordhuis <info@bnoordhuis.nl>
Sat, 25 Oct 2014 06:45:55 +0000 (08:45 +0200)
committerBen Noordhuis <info@bnoordhuis.nl>
Tue, 28 Oct 2014 11:56:46 +0000 (12:56 +0100)
This commit drops the semaphore in exchange for a second condition
variable and makes the task ring an array member instead of allocating
it on the heap.  That in turn makes size calculations a little easier
because of the array's fixed size.

PR-URL: https://github.com/node-forward/node/pull/34
Reviewed-By: Fedor Indutny <fedor@indutny.com>
src/node_v8_platform.cc
src/node_v8_platform.h

index aed4661..ad46e0c 100644 (file)
@@ -95,30 +95,19 @@ void Platform::WorkerBody(void* arg) {
 }
 
 
-TaskQueue::TaskQueue() {
-  int err;
-
-  for (size_t i = 0; i < ARRAY_SIZE(ring_); i += 1)
-    ring_[i] = nullptr;
-
-  read_off_ = 0;
-  write_off_ = 0;
-
-  err = uv_sem_init(&sem_, 0);
-  CHECK_EQ(err, 0);
-
-  err = uv_cond_init(&cond_);
-  CHECK_EQ(err, 0);
-
-  err = uv_mutex_init(&mutex_);
-  CHECK_EQ(err, 0);
+TaskQueue::TaskQueue() : read_off_(0), write_off_(0) {
+  CHECK_EQ(0, uv_cond_init(&read_cond_));
+  CHECK_EQ(0, uv_cond_init(&write_cond_));
+  CHECK_EQ(0, uv_mutex_init(&mutex_));
 }
 
 
 TaskQueue::~TaskQueue() {
+  uv_mutex_lock(&mutex_);
   CHECK_EQ(read_off_, write_off_);
-  uv_sem_destroy(&sem_);
-  uv_cond_destroy(&cond_);
+  uv_mutex_unlock(&mutex_);
+  uv_cond_destroy(&read_cond_);
+  uv_cond_destroy(&write_cond_);
   uv_mutex_destroy(&mutex_);
 }
 
@@ -126,33 +115,53 @@ TaskQueue::~TaskQueue() {
 void TaskQueue::Push(Task* task) {
   uv_mutex_lock(&mutex_);
 
-  // Wait for empty cell
-  while (ring_[write_off_] != nullptr)
-    uv_cond_wait(&cond_, &mutex_);
+  while (can_write() == false)
+    uv_cond_wait(&write_cond_, &mutex_);  // Wait until there is a free slot.
 
   ring_[write_off_] = task;
-  write_off_++;
-  write_off_ &= kRingMask;
+  write_off_ = next(write_off_);
+  uv_cond_signal(&read_cond_);
   uv_mutex_unlock(&mutex_);
-
-  uv_sem_post(&sem_);
 }
 
 
 Task* TaskQueue::Shift() {
-  uv_sem_wait(&sem_);
-
   uv_mutex_lock(&mutex_);
-  Task* task = ring_[read_off_];
-  ring_[read_off_] = nullptr;
-  uv_cond_signal(&cond_);
 
-  read_off_++;
-  read_off_ &= kRingMask;
+  while (can_read() == false)
+    uv_cond_wait(&read_cond_, &mutex_);
+
+  Task* task = ring_[read_off_];
+  if (can_write() == false)
+    uv_cond_signal(&write_cond_);  // Signal waiters that we freed up a slot.
+  read_off_ = next(read_off_);
   uv_mutex_unlock(&mutex_);
 
   return task;
 }
 
 
+unsigned int TaskQueue::next(unsigned int n) {
+  return (n + 1) % ARRAY_SIZE(TaskQueue::ring_);
+}
+
+
+bool TaskQueue::can_read() const {
+  return read_off_ != write_off_;
+}
+
+
+// The read pointer chases the write pointer in the circular queue.
+// This method checks that the write pointer hasn't advanced so much
+// that it has gone full circle and caught up with the read pointer.
+//
+// can_write() returns false when there is an empty slot but the read pointer
+// points to the first element and the write pointer to the last element.
+// That should be rare enough that it is not worth the extra bookkeeping
+// to work around that.  It's not harmful either, just mildly inefficient.
+bool TaskQueue::can_write() const {
+  return next(write_off_) != read_off_;
+}
+
+
 }  // namespace node
index a51c3a1..44acd62 100644 (file)
@@ -36,18 +36,15 @@ class TaskQueue {
   v8::Task* Shift();
 
  private:
-  static const unsigned int kRingSize = 1024;
-  static const unsigned int kRingMask = kRingSize - 1;
-
-  static_assert(kRingSize == (kRingSize & ~kRingMask),
-                "kRingSize is not a power of two");
-
-  uv_sem_t sem_;
-  uv_cond_t cond_;
+  static unsigned int next(unsigned int n);
+  bool can_read() const;
+  bool can_write() const;
+  uv_cond_t read_cond_;
+  uv_cond_t write_cond_;
   uv_mutex_t mutex_;
   unsigned int read_off_;
   unsigned int write_off_;
-  v8::Task* ring_[kRingSize];
+  v8::Task* ring_[1024];
 };
 
 class Platform : public v8::Platform {