}
-TaskQueue::TaskQueue() {
- int err;
-
- for (size_t i = 0; i < ARRAY_SIZE(ring_); i += 1)
- ring_[i] = nullptr;
-
- read_off_ = 0;
- write_off_ = 0;
-
- err = uv_sem_init(&sem_, 0);
- CHECK_EQ(err, 0);
-
- err = uv_cond_init(&cond_);
- CHECK_EQ(err, 0);
-
- err = uv_mutex_init(&mutex_);
- CHECK_EQ(err, 0);
+TaskQueue::TaskQueue() : read_off_(0), write_off_(0) {
+ CHECK_EQ(0, uv_cond_init(&read_cond_));
+ CHECK_EQ(0, uv_cond_init(&write_cond_));
+ CHECK_EQ(0, uv_mutex_init(&mutex_));
}
TaskQueue::~TaskQueue() {
+ uv_mutex_lock(&mutex_);
CHECK_EQ(read_off_, write_off_);
- uv_sem_destroy(&sem_);
- uv_cond_destroy(&cond_);
+ uv_mutex_unlock(&mutex_);
+ uv_cond_destroy(&read_cond_);
+ uv_cond_destroy(&write_cond_);
uv_mutex_destroy(&mutex_);
}
void TaskQueue::Push(Task* task) {
uv_mutex_lock(&mutex_);
- // Wait for empty cell
- while (ring_[write_off_] != nullptr)
- uv_cond_wait(&cond_, &mutex_);
+ while (can_write() == false)
+ uv_cond_wait(&write_cond_, &mutex_); // Wait until there is a free slot.
ring_[write_off_] = task;
- write_off_++;
- write_off_ &= kRingMask;
+ write_off_ = next(write_off_);
+ uv_cond_signal(&read_cond_);
uv_mutex_unlock(&mutex_);
-
- uv_sem_post(&sem_);
}
Task* TaskQueue::Shift() {
- uv_sem_wait(&sem_);
-
uv_mutex_lock(&mutex_);
- Task* task = ring_[read_off_];
- ring_[read_off_] = nullptr;
- uv_cond_signal(&cond_);
- read_off_++;
- read_off_ &= kRingMask;
+ while (can_read() == false)
+ uv_cond_wait(&read_cond_, &mutex_);
+
+ Task* task = ring_[read_off_];
+ if (can_write() == false)
+ uv_cond_signal(&write_cond_); // Signal waiters that we freed up a slot.
+ read_off_ = next(read_off_);
uv_mutex_unlock(&mutex_);
return task;
}
+unsigned int TaskQueue::next(unsigned int n) {
+ return (n + 1) % ARRAY_SIZE(TaskQueue::ring_);
+}
+
+
+bool TaskQueue::can_read() const {
+ return read_off_ != write_off_;
+}
+
+
+// The read pointer chases the write pointer in the circular queue.
+// This method checks that the write pointer hasn't advanced so much
+// that it has gone full circle and caught up with the read pointer.
+//
+// can_write() returns false when there is an empty slot but the read pointer
+// points to the first element and the write pointer to the last element.
+// That should be rare enough that it is not worth the extra bookkeeping
+// to work around that. It's not harmful either, just mildly inefficient.
+bool TaskQueue::can_write() const {
+ return next(write_off_) != read_off_;
+}
+
+
} // namespace node