Fix data race in SamplingCircularQueue
authoryurys@chromium.org <yurys@chromium.org@ce2b1a6d-e550-0410-aec6-3dcde31c8c00>
Thu, 18 Jul 2013 13:42:04 +0000 (13:42 +0000)
committeryurys@chromium.org <yurys@chromium.org@ce2b1a6d-e550-0410-aec6-3dcde31c8c00>
Thu, 18 Jul 2013 13:42:04 +0000 (13:42 +0000)
This change fixes data race described in the bug by adding Acquire_Load to SamplingCircularQueue::StartDequeue and Acquire_Store to SamplingCircularQueue::Enqueue.

Also the queue implementation imposed a constraint on the records it stored: the first AtomicWord in each record was a marker. For that purpose TickSampleEventRecord had filter field of type int. This approach is error prone, e.g. on x64 sizeof(AtomicWord) is 8 while sizeof(int) is 4. Moreover the queue needs such marker only at the beginning of chunk. I changed the queue so that it stores the marker explicitly as the first Cell in chunk and removed the filter field.

BUG=251218
R=loislo@chromium.org, yangguo@chromium.org

Review URL: https://codereview.chromium.org/19642002

git-svn-id: http://v8.googlecode.com/svn/branches/bleeding_edge@15750 ce2b1a6d-e550-0410-aec6-3dcde31c8c00

src/circular-queue-inl.h
src/circular-queue.cc
src/circular-queue.h
src/cpu-profiler.h
test/cctest/test-circular-queue.cc

index 373bf60..b48070a 100644 (file)
@@ -35,7 +35,16 @@ namespace internal {
 
 
 void* SamplingCircularQueue::Enqueue() {
-  WrapPositionIfNeeded(&producer_pos_->enqueue_pos);
+  if (producer_pos_->enqueue_pos == producer_pos_->next_chunk_pos) {
+    if (producer_pos_->enqueue_pos == buffer_ + buffer_size_) {
+      producer_pos_->next_chunk_pos = buffer_;
+      producer_pos_->enqueue_pos = buffer_;
+    }
+    Acquire_Store(producer_pos_->next_chunk_pos, kEnqueueStarted);
+    // Skip marker.
+    producer_pos_->enqueue_pos += 1;
+    producer_pos_->next_chunk_pos += chunk_size_;
+  }
   void* result = producer_pos_->enqueue_pos;
   producer_pos_->enqueue_pos += record_size_;
   return result;
@@ -44,7 +53,7 @@ void* SamplingCircularQueue::Enqueue() {
 
 void SamplingCircularQueue::WrapPositionIfNeeded(
     SamplingCircularQueue::Cell** pos) {
-  if (**pos == kEnd) *pos = buffer_;
+  if (*pos == buffer_ + buffer_size_) *pos = buffer_;
 }
 
 
index 928c3f0..475b234 100644 (file)
@@ -33,26 +33,22 @@ namespace v8 {
 namespace internal {
 
 
-SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
-                                             int desired_chunk_size_in_bytes,
+SamplingCircularQueue::SamplingCircularQueue(size_t record_size_in_bytes,
+                                             size_t desired_chunk_size_in_bytes,
                                              int buffer_size_in_chunks)
     : record_size_(record_size_in_bytes / sizeof(Cell)),
       chunk_size_in_bytes_(desired_chunk_size_in_bytes / record_size_in_bytes *
-                        record_size_in_bytes),
+                        record_size_in_bytes + sizeof(Cell)),
       chunk_size_(chunk_size_in_bytes_ / sizeof(Cell)),
       buffer_size_(chunk_size_ * buffer_size_in_chunks),
-      // The distance ensures that producer and consumer never step on
-      // each other's chunks and helps eviction of produced data from
-      // the CPU cache (having that chunk size is bigger than the cache.)
-      producer_consumer_distance_(2 * chunk_size_),
-      buffer_(NewArray<Cell>(buffer_size_ + 1)) {
+      buffer_(NewArray<Cell>(buffer_size_)) {
+  ASSERT(record_size_ * sizeof(Cell) == record_size_in_bytes);
+  ASSERT(chunk_size_ * sizeof(Cell) == chunk_size_in_bytes_);
   ASSERT(buffer_size_in_chunks > 2);
-  // Clean up the whole buffer to avoid encountering a random kEnd
-  // while enqueuing.
-  for (int i = 0; i < buffer_size_; ++i) {
+  // Mark all chunks as clear.
+  for (int i = 0; i < buffer_size_; i += chunk_size_) {
     buffer_[i] = kClear;
   }
-  buffer_[buffer_size_] = kEnd;
 
   // Layout producer and consumer position pointers each on their own
   // cache lines to avoid cache lines thrashing due to simultaneous
@@ -67,6 +63,7 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
 
   producer_pos_ = reinterpret_cast<ProducerPosition*>(
       RoundUp(positions_, kProcessorCacheLineSize));
+  producer_pos_->next_chunk_pos = buffer_;
   producer_pos_->enqueue_pos = buffer_;
 
   consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
@@ -74,7 +71,11 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
   ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
          positions_ + positions_size);
   consumer_pos_->dequeue_chunk_pos = buffer_;
-  consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
+  // The distance ensures that producer and consumer never step on
+  // each other's chunks and helps eviction of produced data from
+  // the CPU cache (having that chunk size is bigger than the cache.)
+  const int producer_consumer_distance = (2 * chunk_size_);
+  consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance;
   consumer_pos_->dequeue_pos = NULL;
 }
 
@@ -89,9 +90,11 @@ void* SamplingCircularQueue::StartDequeue() {
   if (consumer_pos_->dequeue_pos != NULL) {
     return consumer_pos_->dequeue_pos;
   } else {
-    if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) {
-      consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos;
-      consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_;
+    if (Acquire_Load(consumer_pos_->dequeue_chunk_poll_pos) != kClear) {
+      // Skip marker.
+      consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos + 1;
+      consumer_pos_->dequeue_end_pos =
+          consumer_pos_->dequeue_chunk_pos + chunk_size_;
       return consumer_pos_->dequeue_pos;
     } else {
       return NULL;
index 73afc68..239ffe1 100644 (file)
@@ -45,8 +45,8 @@ namespace internal {
 class SamplingCircularQueue {
  public:
   // Executed on the application thread.
-  SamplingCircularQueue(int record_size_in_bytes,
-                        int desired_chunk_size_in_bytes,
+  SamplingCircularQueue(size_t record_size_in_bytes,
+                        size_t desired_chunk_size_in_bytes,
                         int buffer_size_in_chunks);
   ~SamplingCircularQueue();
 
@@ -67,12 +67,16 @@ class SamplingCircularQueue {
   void FlushResidualRecords();
 
   typedef AtomicWord Cell;
-  // Reserved values for the first cell of a record.
-  static const Cell kClear = 0;  // Marks clean (processed) chunks.
-  static const Cell kEnd = -1;   // Marks the end of the buffer.
 
  private:
+  // Reserved values for the chunk marker (first Cell in each chunk).
+  enum {
+    kClear,          // Marks clean (processed) chunks.
+    kEnqueueStarted  // Marks chunks where enqueue started.
+  };
+
   struct ProducerPosition {
+    Cell* next_chunk_pos;
     Cell* enqueue_pos;
   };
   struct ConsumerPosition {
@@ -85,10 +89,9 @@ class SamplingCircularQueue {
   INLINE(void WrapPositionIfNeeded(Cell** pos));
 
   const int record_size_;
-  const int chunk_size_in_bytes_;
+  const size_t chunk_size_in_bytes_;
   const int chunk_size_;
   const int buffer_size_;
-  const int producer_consumer_distance_;
   Cell* buffer_;
   byte* positions_;
   ProducerPosition* producer_pos_;
index 0718a89..44e63fe 100644 (file)
@@ -110,18 +110,8 @@ class TickSampleEventRecord {
   // The parameterless constructor is used when we dequeue data from
   // the ticks buffer.
   TickSampleEventRecord() { }
-  explicit TickSampleEventRecord(unsigned order)
-      : filler(1),
-        order(order) {
-    ASSERT(filler != SamplingCircularQueue::kClear);
-  }
+  explicit TickSampleEventRecord(unsigned order) : order(order) { }
 
-  // The first machine word of a TickSampleEventRecord must not ever
-  // become equal to SamplingCircularQueue::kClear.  As both order and
-  // TickSample's first field are not reliable in this sense (order
-  // can overflow, TickSample can have all fields reset), we are
-  // forced to use an artificial filler field.
-  int filler;
   unsigned order;
   TickSample sample;
 
index 12b593f..4d7856e 100644 (file)
@@ -42,8 +42,6 @@ TEST(SamplingCircularQueue) {
                             3);
 
   // Check that we are using non-reserved values.
-  CHECK_NE(SamplingCircularQueue::kClear, 1);
-  CHECK_NE(SamplingCircularQueue::kEnd, 1);
   // Fill up the first chunk.
   CHECK_EQ(NULL, scq.StartDequeue());
   for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) {
@@ -153,8 +151,6 @@ TEST(SamplingCircularQueueMultithreading) {
   scq.FlushResidualRecords();
 
   // Check that we are using non-reserved values.
-  CHECK_NE(SamplingCircularQueue::kClear, 1);
-  CHECK_NE(SamplingCircularQueue::kEnd, 1);
   ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore);
   ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore);
   ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore);